feat: add clickhouse database bridge

This commit adds a Clickhouse bridge to EMQX 5. The bridge is similar to
the Clickhouse bridge in the 4.4, but adds the possibility to use
different formats (such as JSON) for values to be inserted.
This commit is contained in:
Kjell Winblad 2023-01-17 14:07:56 +01:00
parent 9fb74bfc87
commit 67acdf0888
19 changed files with 2082 additions and 7 deletions

View File

@ -0,0 +1,678 @@
<?xml version="1.0"?>
<!--
NOTE: User and query level settings are set up in "users.xml" file.
If you have accidentially specified user-level settings here, server won't start.
You can either move the settings to the right place inside "users.xml" file
or add <skip_check_for_incorrect_settings>1</skip_check_for_incorrect_settings> here.
-->
<yandex>
<logger>
<!-- Possible levels: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105 -->
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<!-- <console>1</console> --> <!-- Default behavior is autodetection (log to console if not daemon mode and is tty) -->
<!-- Per level overrides (legacy):
For example to suppress logging of the ConfigReloader you can use:
NOTE: levels.logger is reserved, see below.
-->
<!--
<levels>
<ConfigReloader>none</ConfigReloader>
</levels>
-->
<!-- Per level overrides:
For example to suppress logging of the RBAC for default user you can use:
(But please note that the logger name maybe changed from version to version, even after minor upgrade)
-->
<!--
<levels>
<logger>
<name>ContextAccess (default)</name>
<level>none</level>
</logger>
<logger>
<name>DatabaseOrdinary (test)</name>
<level>none</level>
</logger>
</levels>
-->
</logger>
<send_crash_reports>
<!-- Changing <enabled> to true allows sending crash reports to -->
<!-- the ClickHouse core developers team via Sentry https://sentry.io -->
<!-- Doing so at least in pre-production environments is highly appreciated -->
<enabled>false</enabled>
<!-- Change <anonymize> to true if you don't feel comfortable attaching the server hostname to the crash report -->
<anonymize>false</anonymize>
<!-- Default endpoint should be changed to different Sentry DSN only if you have -->
<!-- some in-house engineers or hired consultants who're going to debug ClickHouse issues for you -->
<endpoint>https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277</endpoint>
</send_crash_reports>
<!--display_name>production</display_name--> <!-- It is the name that will be shown in the client -->
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<mysql_port>9004</mysql_port>
<!-- For HTTPS and SSL over native protocol. -->
<!--
<https_port>8443</https_port>
<tcp_port_secure>9440</tcp_port_secure>
-->
<!-- Used with https_port and tcp_port_secure. Full ssl options list: https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h#L71 -->
<openSSL>
<server> <!-- Used for https server AND secure tcp port -->
<!-- openssl req -subj "/CN=localhost" -new -newkey rsa:2048 -days 365 -nodes -x509 -keyout /etc/clickhouse-server/server.key -out /etc/clickhouse-server/server.crt -->
<certificateFile>/etc/clickhouse-server/server.crt</certificateFile>
<privateKeyFile>/etc/clickhouse-server/server.key</privateKeyFile>
<!-- openssl dhparam -out /etc/clickhouse-server/dhparam.pem 4096 -->
<dhParamsFile>/etc/clickhouse-server/dhparam.pem</dhParamsFile>
<verificationMode>none</verificationMode>
<loadDefaultCAFile>true</loadDefaultCAFile>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
<client> <!-- Used for connecting to https dictionary source and secured Zookeeper communication -->
<loadDefaultCAFile>true</loadDefaultCAFile>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
<!-- Use for self-signed: <verificationMode>none</verificationMode> -->
<invalidCertificateHandler>
<!-- Use for self-signed: <name>AcceptCertificateHandler</name> -->
<name>RejectCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<!-- Default root page on http[s] server. For example load UI from https://tabix.io/ when opening http://localhost:8123 -->
<!--
<http_server_default_response><![CDATA[<html ng-app="SMI2"><head><base href="http://ui.tabix.io/"></head><body><div ui-view="" class="content-ui"></div><script src="http://loader.tabix.io/master.js"></script></body></html>]]></http_server_default_response>
-->
<!-- Port for communication between replicas. Used for data exchange. -->
<interserver_http_port>9009</interserver_http_port>
<!-- Hostname that is used by other replicas to request this server.
If not specified, than it is determined analoguous to 'hostname -f' command.
This setting could be used to switch replication to another network interface.
-->
<!--
<interserver_http_host>example.yandex.ru</interserver_http_host>
-->
<!-- Listen specified host. use :: (wildcard IPv6 address), if you want to accept connections both with IPv4 and IPv6 from everywhere. -->
<!-- <listen_host>::</listen_host> -->
<!-- Same for hosts with disabled ipv6: -->
<!-- <listen_host>0.0.0.0</listen_host> -->
<!-- Default values - try listen localhost on ipv4 and ipv6: -->
<!--
<listen_host>::1</listen_host>
<listen_host>127.0.0.1</listen_host>
-->
<!-- Don't exit if ipv6 or ipv4 unavailable, but listen_host with this protocol specified -->
<!-- <listen_try>0</listen_try> -->
<!-- Allow listen on same address:port -->
<!-- <listen_reuse_port>0</listen_reuse_port> -->
<!-- <listen_backlog>64</listen_backlog> -->
<max_connections>4096</max_connections>
<keep_alive_timeout>3</keep_alive_timeout>
<!-- Maximum number of concurrent queries. -->
<max_concurrent_queries>100</max_concurrent_queries>
<!-- Maximum memory usage (resident set size) for server process.
Zero value or unset means default. Default is "max_server_memory_usage_to_ram_ratio" of available physical RAM.
If the value is larger than "max_server_memory_usage_to_ram_ratio" of available physical RAM, it will be cut down.
The constraint is checked on query execution time.
If a query tries to allocate memory and the current memory usage plus allocation is greater
than specified threshold, exception will be thrown.
It is not practical to set this constraint to small values like just a few gigabytes,
because memory allocator will keep this amount of memory in caches and the server will deny service of queries.
-->
<max_server_memory_usage>0</max_server_memory_usage>
<!-- Maximum number of threads in the Global thread pool.
This will default to a maximum of 10000 threads if not specified.
This setting will be useful in scenarios where there are a large number
of distributed queries that are running concurrently but are idling most
of the time, in which case a higher number of threads might be required.
-->
<max_thread_pool_size>10000</max_thread_pool_size>
<!-- On memory constrained environments you may have to set this to value larger than 1.
-->
<max_server_memory_usage_to_ram_ratio>10</max_server_memory_usage_to_ram_ratio>
<!-- Simple server-wide memory profiler. Collect a stack trace at every peak allocation step (in bytes).
Data will be stored in system.trace_log table with query_id = empty string.
Zero means disabled.
-->
<total_memory_profiler_step>4194304</total_memory_profiler_step>
<!-- Collect random allocations and deallocations and write them into system.trace_log with 'MemorySample' trace_type.
The probability is for every alloc/free regardless to the size of the allocation.
Note that sampling happens only when the amount of untracked memory exceeds the untracked memory limit,
which is 4 MiB by default but can be lowered if 'total_memory_profiler_step' is lowered.
You may want to set 'total_memory_profiler_step' to 1 for extra fine grained sampling.
-->
<total_memory_tracker_sample_probability>0</total_memory_tracker_sample_probability>
<!-- Set limit on number of open files (default: maximum). This setting makes sense on Mac OS X because getrlimit() fails to retrieve
correct maximum value. -->
<!-- <max_open_files>262144</max_open_files> -->
<!-- Size of cache of uncompressed blocks of data, used in tables of MergeTree family.
In bytes. Cache is single for server. Memory is allocated only on demand.
Cache is used when 'use_uncompressed_cache' user setting turned on (off by default).
Uncompressed cache is advantageous only for very short queries and in rare cases.
-->
<uncompressed_cache_size>8589934592</uncompressed_cache_size>
<!-- Approximate size of mark cache, used in tables of MergeTree family.
In bytes. Cache is single for server. Memory is allocated only on demand.
You should not lower this value.
-->
<mark_cache_size>5368709120</mark_cache_size>
<!-- Path to data directory, with trailing slash. -->
<path>/var/lib/clickhouse/</path>
<!-- Path to temporary data for processing hard queries. -->
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<!-- Policy from the <storage_configuration> for the temporary files.
If not set <tmp_path> is used, otherwise <tmp_path> is ignored.
Notes:
- move_factor is ignored
- keep_free_space_bytes is ignored
- max_data_part_size_bytes is ignored
- you must have exactly one volume in that policy
-->
<!-- <tmp_policy>tmp</tmp_policy> -->
<!-- Directory with user provided files that are accessible by 'file' table function. -->
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
<!-- Path to folder where users and roles created by SQL commands are stored. -->
<access_control_path>/var/lib/clickhouse/access/</access_control_path>
<!-- Path to configuration file with users, access rights, profiles of settings, quotas. -->
<users_config>/etc/clickhouse-server/users.xml</users_config>
<!-- Default profile of settings. -->
<default_profile>default</default_profile>
<!-- System profile of settings. This settings are used by internal processes (Buffer storage, Distibuted DDL worker and so on). -->
<!-- <system_profile>default</system_profile> -->
<!-- Default database. -->
<default_database>default</default_database>
<!-- Server time zone could be set here.
Time zone is used when converting between String and DateTime types,
when printing DateTime in text formats and parsing DateTime from text,
it is used in date and time related functions, if specific time zone was not passed as an argument.
Time zone is specified as identifier from IANA time zone database, like UTC or Africa/Abidjan.
If not specified, system time zone at server startup is used.
Please note, that server could display time zone alias instead of specified name.
Example: W-SU is an alias for Europe/Moscow and Zulu is an alias for UTC.
-->
<!-- <timezone>Europe/Moscow</timezone> -->
<!-- You can specify umask here (see "man umask"). Server will apply it on startup.
Number is always parsed as octal. Default umask is 027 (other users cannot read logs, data files, etc; group can only read).
-->
<!-- <umask>022</umask> -->
<!-- Perform mlockall after startup to lower first queries latency
and to prevent clickhouse executable from being paged out under high IO load.
Enabling this option is recommended but will lead to increased startup time for up to a few seconds.
-->
<mlock_executable>true</mlock_executable>
<!-- Configuration of clusters that could be used in Distributed tables.
https://clickhouse.tech/docs/en/operations/table_engines/distributed/
-->
<remote_servers incl="clickhouse_remote_servers" >
<!-- Test only shard config for testing distributed storage -->
<test_shard_localhost>
<shard>
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
<!-- <internal_replication>false</internal_replication> -->
<!-- Optional. Shard weight when writing data. Default: 1. -->
<!-- <weight>1</weight> -->
<replica>
<host>localhost</host>
<port>9000</port>
<!-- Optional. Priority of the replica for load_balancing. Default: 1 (less value has more priority). -->
<!-- <priority>1</priority> -->
</replica>
</shard>
</test_shard_localhost>
<test_cluster_two_shards_localhost>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards_localhost>
<test_cluster_two_shards>
<shard>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards>
<test_shard_localhost_secure>
<shard>
<replica>
<host>localhost</host>
<port>9440</port>
<secure>1</secure>
</replica>
</shard>
</test_shard_localhost_secure>
<test_unavailable_shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>1</port>
</replica>
</shard>
</test_unavailable_shard>
</remote_servers>
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.
If this section is not present in configuration, all hosts are allowed.
-->
<remote_url_allow_hosts>
<!-- Host should be specified exactly as in URL. The name is checked before DNS resolution.
Example: "yandex.ru", "yandex.ru." and "www.yandex.ru" are different hosts.
If port is explicitly specified in URL, the host:port is checked as a whole.
If host specified here without port, any port with this host allowed.
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
-->
<!-- Regular expression can be specified. RE2 engine is used for regexps.
Regexps are not aligned: don't forget to add ^ and $. Also don't forget to escape dot (.) metacharacter
(forgetting to do so is a common source of error).
-->
</remote_url_allow_hosts>
<!-- If element has 'incl' attribute, then for it's value will be used corresponding substitution from another file.
By default, path to file with substitutions is /etc/metrika.xml. It could be changed in config in 'include_from' element.
Values for substitutions are specified in /yandex/name_of_substitution elements in that file.
-->
<!-- ZooKeeper is used to store metadata about replicas, when using Replicated tables.
Optional. If you don't use replicated tables, you could omit that.
See https://clickhouse.yandex/docs/en/table_engines/replication/
-->
<zookeeper incl="zookeeper-servers" optional="true" />
<!-- Substitutions for parameters of replicated tables.
Optional. If you don't use replicated tables, you could omit that.
See https://clickhouse.yandex/docs/en/table_engines/replication/#creating-replicated-tables
-->
<macros incl="macros" optional="true" />
<!-- Reloading interval for embedded dictionaries, in seconds. Default: 3600. -->
<builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval>
<!-- Maximum session timeout, in seconds. Default: 3600. -->
<max_session_timeout>3600</max_session_timeout>
<!-- Default session timeout, in seconds. Default: 60. -->
<default_session_timeout>60</default_session_timeout>
<!-- Sending data to Graphite for monitoring. Several sections can be defined. -->
<!--
interval - send every X second
root_path - prefix for keys
hostname_in_path - append hostname to root_path (default = true)
metrics - send data from table system.metrics
events - send data from table system.events
asynchronous_metrics - send data from table system.asynchronous_metrics
-->
<!--
<graphite>
<host>localhost</host>
<port>42000</port>
<timeout>0.1</timeout>
<interval>60</interval>
<root_path>one_min</root_path>
<hostname_in_path>true</hostname_in_path>
<metrics>true</metrics>
<events>true</events>
<events_cumulative>false</events_cumulative>
<asynchronous_metrics>true</asynchronous_metrics>
</graphite>
<graphite>
<host>localhost</host>
<port>42000</port>
<timeout>0.1</timeout>
<interval>1</interval>
<root_path>one_sec</root_path>
<metrics>true</metrics>
<events>true</events>
<events_cumulative>false</events_cumulative>
<asynchronous_metrics>false</asynchronous_metrics>
</graphite>
-->
<!-- Serve endpoint fot Prometheus monitoring. -->
<!--
endpoint - mertics path (relative to root, statring with "/")
port - port to setup server. If not defined or 0 than http_port used
metrics - send data from table system.metrics
events - send data from table system.events
asynchronous_metrics - send data from table system.asynchronous_metrics
status_info - send data from different component from CH, ex: Dictionaries status
-->
<!--
<prometheus>
<endpoint>/metrics</endpoint>
<port>9363</port>
<metrics>true</metrics>
<events>true</events>
<asynchronous_metrics>true</asynchronous_metrics>
<status_info>true</status_info>
</prometheus>
-->
<!-- Query log. Used only for queries with setting log_queries = 1. -->
<query_log>
<!-- What table to insert data. If table is not exist, it will be created.
When query log structure is changed after system update,
then old table will be renamed and new table will be created automatically.
-->
<database>system</database>
<table>query_log</table>
<!--
PARTITION BY expr https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
Example:
event_date
toMonday(event_date)
toYYYYMM(event_date)
toStartOfHour(event_time)
-->
<partition_by>toYYYYMM(event_date)</partition_by>
<!-- Instead of partition_by, you can provide full engine expression (starting with ENGINE = ) with parameters,
Example: <engine>ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024</engine>
-->
<!-- Interval of flushing data. -->
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_log>
<!-- Trace log. Stores stack traces collected by query profilers.
See query_profiler_real_time_period_ns and query_profiler_cpu_time_period_ns settings. -->
<trace_log>
<database>system</database>
<table>trace_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</trace_log>
<!-- Query thread log. Has information about all threads participated in query execution.
Used only for queries with setting log_query_threads = 1. -->
<query_thread_log>
<database>system</database>
<table>query_thread_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_thread_log>
<!-- Uncomment if use part log.
Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</part_log>
-->
<!-- Uncomment to write text log into table.
Text log contains all information from usual server log but stores it in structured and efficient way.
The level of the messages that goes to the table can be limited (<level>), if not specified all messages will go to the table.
<text_log>
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<level></level>
</text_log>
-->
<!-- Metric log contains rows with current values of ProfileEvents, CurrentMetrics collected with "collect_interval_milliseconds" interval. -->
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
<!--
Asynchronous metric log contains values of metrics from
system.asynchronous_metrics.
-->
<asynchronous_metric_log>
<database>system</database>
<table>asynchronous_metric_log</table>
<!--
Asynchronous metrics are updated once a minute, so there is
no need to flush more often.
-->
<flush_interval_milliseconds>60000</flush_interval_milliseconds>
</asynchronous_metric_log>
<!-- Parameters for embedded dictionaries, used in Yandex.Metrica.
See https://clickhouse.yandex/docs/en/dicts/internal_dicts/
-->
<!-- Path to file with region hierarchy. -->
<!-- <path_to_regions_hierarchy_file>/opt/geo/regions_hierarchy.txt</path_to_regions_hierarchy_file> -->
<!-- Path to directory with files containing names of regions -->
<!-- <path_to_regions_names_files>/opt/geo/</path_to_regions_names_files> -->
<!-- Configuration of external dictionaries. See:
https://clickhouse.yandex/docs/en/dicts/external_dicts/
-->
<dictionaries_config>*_dictionary.xml</dictionaries_config>
<!-- Uncomment if you want data to be compressed 30-100% better.
Don't do that if you just started using ClickHouse.
-->
<compression incl="clickhouse_compression">
<!--
<!- - Set of variants. Checked in order. Last matching case wins. If nothing matches, lz4 will be used. - ->
<case>
<!- - Conditions. All must be satisfied. Some conditions may be omitted. - ->
<min_part_size>10000000000</min_part_size> <!- - Min part size in bytes. - ->
<min_part_size_ratio>0.01</min_part_size_ratio> <!- - Min size of part relative to whole table size. - ->
<!- - What compression method to use. - ->
<method>zstd</method>
</case>
-->
</compression>
<!-- Allow to execute distributed DDL queries (CREATE, DROP, ALTER, RENAME) on cluster.
Works only if ZooKeeper is enabled. Comment it if such functionality isn't required. -->
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->
</distributed_ddl>
<!-- Settings to fine tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->
<!--
<merge_tree>
<max_suspicious_broken_parts>5</max_suspicious_broken_parts>
</merge_tree>
-->
<!-- Protection from accidental DROP.
If size of a MergeTree table is greater than max_table_size_to_drop (in bytes) than table could not be dropped with any DROP query.
If you want do delete one table and don't want to change clickhouse-server config, you could create special file <clickhouse-path>/flags/force_drop_table and make DROP once.
By default max_table_size_to_drop is 50GB; max_table_size_to_drop=0 allows to DROP any tables.
The same for max_partition_size_to_drop.
Uncomment to disable protection.
-->
<!-- <max_table_size_to_drop>0</max_table_size_to_drop> -->
<!-- <max_partition_size_to_drop>0</max_partition_size_to_drop> -->
<!-- Example of parameters for GraphiteMergeTree table engine -->
<graphite_rollup_example>
<pattern>
<regexp>click_cost</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>3600</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<default>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup_example>
<!-- Directory in <clickhouse-path> containing schema files for various input formats.
The directory will be created if it doesn't exist.
-->
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
<!-- Uncomment to use query masking rules.
name - name for the rule (optional)
regexp - RE2 compatible regular expression (mandatory)
replace - substitution string for sensitive data (optional, by default - six asterisks)
<query_masking_rules>
<rule>
<name>hide SSN</name>
<regexp>\b\d{3}-\d{2}-\d{4}\b</regexp>
<replace>000-00-0000</replace>
</rule>
</query_masking_rules>
-->
<!-- Uncomment to use custom http handlers.
rules are checked from top to bottom, first match runs the handler
url - to match request URL, you can use 'regex:' prefix to use regex match(optional)
methods - to match request method, you can use commas to separate multiple method matches(optional)
headers - to match request headers, match each child element(child element name is header name), you can use 'regex:' prefix to use regex match(optional)
handler is request handler
type - supported types: static, dynamic_query_handler, predefined_query_handler
query - use with predefined_query_handler type, executes query when the handler is called
query_param_name - use with dynamic_query_handler type, extracts and executes the value corresponding to the <query_param_name> value in HTTP request params
status - use with static type, response status code
content_type - use with static type, response content-type
response_content - use with static type, Response content sent to client, when using the prefix 'file://' or 'config://', find the content from the file or configuration send to client.
<http_handlers>
<rule>
<url>/</url>
<methods>POST,GET</methods>
<headers><pragma>no-cache</pragma></headers>
<handler>
<type>dynamic_query_handler</type>
<query_param_name>query</query_param_name>
</handler>
</rule>
<rule>
<url>/predefined_query</url>
<methods>POST,GET</methods>
<handler>
<type>predefined_query_handler</type>
<query>SELECT * FROM system.settings</query>
</handler>
</rule>
<rule>
<handler>
<type>static</type>
<status>200</status>
<content_type>text/plain; charset=UTF-8</content_type>
<response_content>config://http_server_default_response</response_content>
</handler>
</rule>
</http_handlers>
-->
<!-- Uncomment to disable ClickHouse internal DNS caching. -->
<!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> -->
</yandex>

View File

@ -0,0 +1,110 @@
<?xml version="1.0"?>
<yandex>
<!-- Profiles of settings. -->
<profiles>
<!-- Default settings. -->
<default>
<!-- Maximum memory usage for processing single query, in bytes. -->
<max_memory_usage>10000000000</max_memory_usage>
<!-- Use cache of uncompressed blocks of data. Meaningful only for processing many of very short queries. -->
<use_uncompressed_cache>0</use_uncompressed_cache>
<!-- How to choose between replicas during distributed query processing.
random - choose random replica from set of replicas with minimum number of errors
nearest_hostname - from set of replicas with minimum number of errors, choose replica
with minimum number of different symbols between replica's hostname and local hostname
(Hamming distance).
in_order - first live replica is chosen in specified order.
first_or_random - if first replica one has higher number of errors, pick a random one from replicas with minimum number of errors.
-->
<load_balancing>random</load_balancing>
</default>
<!-- Profile that allows only read queries. -->
<readonly>
<readonly>1</readonly>
</readonly>
</profiles>
<!-- Users and ACL. -->
<users>
<!-- If user name was not specified, 'default' user is used. -->
<default>
<!-- Password could be specified in plaintext or in SHA256 (in hex format).
If you want to specify password in plaintext (not recommended), place it in 'password' element.
Example: <password>qwerty</password>.
Password could be empty.
If you want to specify SHA256, place it in 'password_sha256_hex' element.
Example: <password_sha256_hex>65e84be33532fb784c48129675f9eff3a682b27168c0ea744b2cf58ee02337c5</password_sha256_hex>
Restrictions of SHA256: impossibility to connect to ClickHouse using MySQL JS client (as of July 2019).
If you want to specify double SHA1, place it in 'password_double_sha1_hex' element.
Example: <password_double_sha1_hex>e395796d6546b1b65db9d665cd43f0e858dd4303</password_double_sha1_hex>
How to generate decent password:
Execute: PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | sha256sum | tr -d '-'
In first line will be password and in second - corresponding SHA256.
How to generate double SHA1:
Execute: PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | sha1sum | tr -d '-' | xxd -r -p | sha1sum | tr -d '-'
In first line will be password and in second - corresponding double SHA1.
-->
<password>public</password>
<!-- List of networks with open access.
To open access from everywhere, specify:
<ip>::/0</ip>
To open access only from localhost, specify:
<ip>::1</ip>
<ip>127.0.0.1</ip>
Each element of list has one of the following forms:
<ip> IP-address or network mask. Examples: 213.180.204.3 or 10.0.0.1/8 or 10.0.0.1/255.255.255.0
2a02:6b8::3 or 2a02:6b8::3/64 or 2a02:6b8::3/ffff:ffff:ffff:ffff::.
<host> Hostname. Example: server01.yandex.ru.
To check access, DNS query is performed, and all received addresses compared to peer address.
<host_regexp> Regular expression for host names. Example, ^server\d\d-\d\d-\d\.yandex\.ru$
To check access, DNS PTR query is performed for peer address and then regexp is applied.
Then, for result of PTR query, another DNS query is performed and all received addresses compared to peer address.
Strongly recommended that regexp is ends with $
All results of DNS requests are cached till server restart.
-->
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<!-- Settings profile for user. -->
<profile>default</profile>
<!-- Quota for user. -->
<quota>default</quota>
<!-- User can create other users and grant rights to them. -->
<!-- <access_management>1</access_management> -->
</default>
</users>
<!-- Quotas. -->
<quotas>
<!-- Name of quota. -->
<default>
<!-- Limits for time interval. You could specify many intervals with different limits. -->
<interval>
<!-- Length of interval. -->
<duration>3600</duration>
<!-- No limits. Just calculate resource usage for time interval. -->
<queries>0</queries>
<errors>0</errors>
<result_rows>0</result_rows>
<read_rows>0</read_rows>
<execution_time>0</execution_time>
</interval>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,16 @@
version: '3.9'
services:
clickhouse:
container_name: clickhouse
image: clickhouse/clickhouse-server:23.1.2.9-alpine
restart: always
volumes:
- ./clickhouse/users.xml:/etc/clickhouse-server/users.xml
- ./clickhouse/config.xml:/etc/clickhouse-server/config.d/config.xml
expose:
- "8123"
ports:
- "8123:8123"
networks:
- emqx_bridge

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.1.11"},
{vsn, "0.1.12"},
{registered, []},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -57,7 +57,8 @@
T == influxdb_api_v2;
T == redis_single;
T == redis_sentinel;
T == redis_cluster
T == redis_cluster;
T == clickhouse
).
load() ->

View File

@ -23,6 +23,7 @@
-define(MONGO_DEFAULT_PORT, 27017).
-define(REDIS_DEFAULT_PORT, 6379).
-define(PGSQL_DEFAULT_PORT, 5432).
-define(CLICKHOUSE_DEFAULT_PORT, 8123).
-define(AUTO_RECONNECT_INTERVAL, 2).

View File

@ -917,6 +917,15 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
%% return `{error, {recoverable_error, Reason}}`
EXPR
catch
%% For convenience and to make the code in the callbacks cleaner an
%% error exception with the two following formats are translated to the
%% corresponding return values. The receiver of the return values
%% recognizes these special return formats and use them to decided if a
%% request should be retried.
error:{unrecoverable_error, Msg} ->
{error, {unrecoverable_error, Msg}};
error:{recoverable_error, Msg} ->
{error, {recoverable_error, Msg}};
ERR:REASON:STACKTRACE ->
?RESOURCE_ERROR(exception, #{
name => NAME,

View File

@ -8,3 +8,4 @@ redis
redis_cluster
pgsql
tdengine
clickhouse

View File

@ -0,0 +1,109 @@
emqx_ee_bridge_clickhouse {
local_topic {
desc {
en: """The MQTT topic filter to be forwarded to Clickhouse. All MQTT 'PUBLISH' messages with the topic
matching the local_topic will be forwarded.</br>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded.
"""
zh: """发送到 'local_topic' 的消息都会转发到 Clickhouse。 </br>
注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
"""
}
label {
en: "Local Topic"
zh: "本地 Topic"
}
}
sql_template {
desc {
en: """SQL Template. The template string can contain placeholders
for message metadata and payload field. The placeholders are inserted
without any checking and special formatting so it is important to
ensure that the inserted values are formatted and escaped correctly."""
zh:
"""SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符
的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符
模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入
所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符
模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入
所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。"""
}
label {
en: "SQL Template"
zh: "SQL 模板"
}
}
batch_value_separator {
desc {
en: """The bridge repeats what comes after the VALUES or FORMAT FormatType in the
SQL template to form a batch request. The value specified with
this parameter will be inserted between the values. The default
value ',' works for the VALUES format but other values
might be needed if you specify some other format with the
clickhouse FORMAT syntax.
See https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ and
https://clickhouse.com/docs/en/interfaces/formats#formats for more information about
the format syntax and the available formats."""
zh: """桥接会重复 VALUES 或 FORMAT 格式类型之后的内容。中 VALUES 或
FORMAT FormatType 后面的内容,以形成一个批处理请求。用这个参数指定的值
这个参数指定的值将被插入到这些值之间。默认的
默认值','适用于VALUES格式但是如果你指定了其他的格式可能需要其他的值。可能需要其他值如果你用
"clickhouse FORMAT "语法指定其他格式。语法指定其他格式。
参见https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ 和
https://clickhouse.com/docs/en/interfaces/formats#formats 了解更多关于
格式语法和可用的格式。"""
}
label {
en: "Batch Value Separator"
zh: "批量值分离器"
}
}
config_enable {
desc {
en: """Enable or disable this bridge"""
zh: """启用/禁用桥接"""
}
label {
en: "Enable Or Disable Bridge"
zh: "启用/禁用桥接"
}
}
desc_config {
desc {
en: """Configuration for an Clickhouse bridge."""
zh: """Clickhouse 桥接配置"""
}
label: {
en: "Clickhouse Bridge Configuration"
zh: "Clickhouse 桥接配置"
}
}
desc_type {
desc {
en: """The Bridge Type"""
zh: """Bridge 类型"""
}
label {
en: "Bridge Type"
zh: "桥接类型"
}
}
desc_name {
desc {
en: """Bridge name."""
zh: """桥接名字"""
}
label {
en: "Bridge Name"
zh: "桥接名字"
}
}
}

View File

@ -29,7 +29,8 @@ api_schemas(Method) ->
ref(emqx_ee_bridge_redis, Method ++ "_cluster"),
ref(emqx_ee_bridge_timescale, Method),
ref(emqx_ee_bridge_matrix, Method),
ref(emqx_ee_bridge_tdengine, Method)
ref(emqx_ee_bridge_tdengine, Method),
ref(emqx_ee_bridge_clickhouse, Method)
].
schema_modules() ->
@ -44,7 +45,8 @@ schema_modules() ->
emqx_ee_bridge_pgsql,
emqx_ee_bridge_timescale,
emqx_ee_bridge_matrix,
emqx_ee_bridge_tdengine
emqx_ee_bridge_tdengine,
emqx_ee_bridge_clickhouse
].
examples(Method) ->
@ -75,7 +77,8 @@ resource_type(redis_cluster) -> emqx_ee_connector_redis;
resource_type(pgsql) -> emqx_connector_pgsql;
resource_type(timescale) -> emqx_connector_pgsql;
resource_type(matrix) -> emqx_connector_pgsql;
resource_type(tdengine) -> emqx_ee_connector_tdengine.
resource_type(tdengine) -> emqx_ee_connector_tdengine;
resource_type(clickhouse) -> emqx_ee_connector_clickhouse.
fields(bridges) ->
[
@ -119,7 +122,8 @@ fields(bridges) ->
required => false
}
)}
] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs().
] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++
clickhouse_structs().
mongodb_structs() ->
[
@ -183,3 +187,18 @@ pgsql_structs() ->
{matrix, <<"Matrix">>}
]
].
clickhouse_structs() ->
[
{Type,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_clickhouse, "config")),
#{
desc => <<Name/binary, "Clickhouse Bridge Config">>,
required => false
}
)}
|| {Type, Name} <- [
{clickhouse, <<"Clickhouse">>}
]
].

View File

@ -0,0 +1,143 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_clickhouse).
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
conn_bridge_examples/1
]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-define(DEFAULT_SQL,
<<"INSERT INTO mqtt_test(payload, arrived) VALUES ('${payload}', ${timestamp})">>
).
-define(DEFAULT_BATCH_VALUE_SEPARATOR, <<", ">>).
%% -------------------------------------------------------------------------------------------------
%% Callback used by HTTP API
%% -------------------------------------------------------------------------------------------------
conn_bridge_examples(Method) ->
[
#{
<<"clickhouse">> => #{
summary => <<"Clickhouse Bridge">>,
value => values(Method, "clickhouse")
}
}
].
values(get, Type) ->
maps:merge(values(post, Type), ?METRICS_EXAMPLE);
values(post, Type) ->
#{
enable => true,
type => Type,
name => <<"foo">>,
server => <<"127.0.0.1:8123">>,
database => <<"mqtt">>,
pool_size => 8,
username => <<"default">>,
password => <<"public">>,
sql => ?DEFAULT_SQL,
batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR,
local_topic => <<"local/topic/#">>,
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
}
};
values(put, Type) ->
values(post, Type).
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
%% -------------------------------------------------------------------------------------------------
namespace() -> "bridge_clickhouse".
roots() -> [].
fields("config") ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{sql,
mk(
binary(),
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
)},
{batch_value_separator,
mk(
binary(),
#{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR}
)},
{local_topic,
mk(
binary(),
#{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
] ++
emqx_ee_connector_clickhouse:fields(config);
fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"),
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
fields("post") ->
fields("post", clickhouse);
fields("put") ->
fields("config");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post").
fields("post", Type) ->
[type_field(Type), name_field() | fields("config")].
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Clickhouse using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) ->
undefined.
%% -------------------------------------------------------------------------------------------------
%% internal
%% -------------------------------------------------------------------------------------------------
is_hidden_opts(Field) ->
lists:member(Field, [
async_inflight_window
]).
type_field(Type) ->
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

View File

@ -0,0 +1,325 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_clickhouse_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-define(CLICKHOUSE_HOST, "clickhouse").
-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse).
-include_lib("emqx_connector/include/emqx_connector.hrl").
%% See comment in
%% lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl for how to
%% run this without bringing up the whole CI infrastucture
%%------------------------------------------------------------------------------
%% Common Test Setup, Teardown and Testcase List
%%------------------------------------------------------------------------------
init_per_suite(Config) ->
case
emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT)
of
true ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(emqx_ee_connector),
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
snabbkaffe:fix_ct_logging(),
%% Create the db table
Conn = start_clickhouse_connection(),
% erlang:monitor,sb
{ok, _, _} = clickhouse:query(Conn, sql_create_database(), #{}),
{ok, _, _} = clickhouse:query(Conn, sql_create_table(), []),
clickhouse:query(Conn, sql_find_key(42), []),
[{clickhouse_connection, Conn} | Config];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_clickhouse);
_ ->
{skip, no_clickhouse}
end
end.
start_clickhouse_connection() ->
%% Start clickhouse connector in sub process so that it does not go
%% down with the process that is calling init_per_suite
InitPerSuiteProcess = self(),
erlang:spawn(
fun() ->
{ok, Conn} =
clickhouse:start_link([
{url, clickhouse_url()},
{user, <<"default">>},
{key, "public"},
{pool, tmp_pool}
]),
InitPerSuiteProcess ! {clickhouse_connection, Conn},
Ref = erlang:monitor(process, Conn),
receive
{'DOWN', Ref, process, _, _} ->
erlang:display(helper_down),
ok
end
end
),
receive
{clickhouse_connection, C} -> C
end.
end_per_suite(Config) ->
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
clickhouse:stop(ClickhouseConnection),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_ee_connector),
_ = application:stop(emqx_bridge).
init_per_testcase(_, Config) ->
reset_table(Config),
Config.
end_per_testcase(_, Config) ->
reset_table(Config),
ok.
all() ->
emqx_common_test_helpers:all(?MODULE).
%%------------------------------------------------------------------------------
%% Helper functions for test cases
%%------------------------------------------------------------------------------
sql_insert_template_for_bridge() ->
"INSERT INTO mqtt_test(key, data, arrived) VALUES "
"(${key}, '${data}', ${timestamp})".
sql_insert_template_for_bridge_json() ->
"INSERT INTO mqtt_test(key, data, arrived) FORMAT JSONCompactEachRow "
"[${key}, \\\"${data}\\\", ${timestamp}]".
sql_create_table() ->
"CREATE TABLE IF NOT EXISTS mqtt.mqtt_test (key BIGINT, data String, arrived BIGINT) ENGINE = Memory".
sql_find_key(Key) ->
io_lib:format("SELECT key FROM mqtt.mqtt_test WHERE key = ~p", [Key]).
sql_find_all_keys() ->
"SELECT key FROM mqtt.mqtt_test".
sql_drop_table() ->
"DROP TABLE IF EXISTS mqtt.mqtt_test".
sql_create_database() ->
"CREATE DATABASE IF NOT EXISTS mqtt".
clickhouse_url() ->
erlang:iolist_to_binary([
<<"http://">>,
?CLICKHOUSE_HOST,
":",
erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT)
]).
clickhouse_config(Config) ->
SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
BatchSeparator = maps:get(batch_value_separator, Config, <<", ">>),
BatchSize = maps:get(batch_size, Config, 1),
BatchTime = maps:get(batch_time_ms, Config, 0),
EnableBatch = maps:get(enable_batch, Config, true),
Name = atom_to_binary(?MODULE),
URL = clickhouse_url(),
ConfigString =
io_lib:format(
"bridges.clickhouse.~s {\n"
" enable = true\n"
" url = \"~s\"\n"
" database = \"mqtt\"\n"
" sql = \"~s\"\n"
" batch_value_separator = \"~s\""
" resource_opts = {\n"
" enable_batch = ~w\n"
" batch_size = ~b\n"
" batch_time = ~bms\n"
" }\n"
"}\n",
[
Name,
URL,
SQL,
BatchSeparator,
EnableBatch,
BatchSize,
BatchTime
]
),
ct:pal(ConfigString),
parse_and_check(ConfigString, <<"clickhouse">>, Name).
parse_and_check(ConfigString, BridgeType, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
RetConfig.
make_bridge(Config) ->
Type = <<"clickhouse">>,
Name = atom_to_binary(?MODULE),
BridgeConfig = clickhouse_config(Config),
{ok, _} = emqx_bridge:create(
Type,
Name,
BridgeConfig
),
emqx_bridge_resource:bridge_id(Type, Name).
delete_bridge() ->
Type = <<"clickhouse">>,
Name = atom_to_binary(?MODULE),
{ok, _} = emqx_bridge:remove(Type, Name),
ok.
reset_table(Config) ->
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
{ok, _, _} = clickhouse:query(ClickhouseConnection, sql_drop_table(), []),
{ok, _, _} = clickhouse:query(ClickhouseConnection, sql_create_table(), []),
ok.
check_key_in_clickhouse(AttempsLeft, Key, Config) ->
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
check_key_in_clickhouse(AttempsLeft, Key, none, ClickhouseConnection).
check_key_in_clickhouse(Key, Config) ->
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
check_key_in_clickhouse(30, Key, none, ClickhouseConnection).
check_key_in_clickhouse(0, Key, PrevResult, _) ->
ct:fail("Expected ~p in database but got ~s", [Key, PrevResult]);
check_key_in_clickhouse(AttempsLeft, Key, _, ClickhouseConnection) ->
{ok, 200, ResultString} = clickhouse:query(ClickhouseConnection, sql_find_key(Key), []),
Expected = erlang:integer_to_binary(Key),
case iolist_to_binary(string:trim(ResultString)) of
Expected ->
ok;
SomethingElse ->
timer:sleep(100),
check_key_in_clickhouse(AttempsLeft - 1, Key, SomethingElse, ClickhouseConnection)
end.
%%------------------------------------------------------------------------------
%% Test Cases
%%------------------------------------------------------------------------------
t_make_delete_bridge(_Config) ->
make_bridge(#{}),
%% Check that the new brige is in the list of bridges
Bridges = emqx_bridge:list(),
Name = atom_to_binary(?MODULE),
IsRightName =
fun
(#{name := BName}) when BName =:= Name ->
true;
(_) ->
false
end,
true = lists:any(IsRightName, Bridges),
delete_bridge(),
BridgesAfterDelete = emqx_bridge:list(),
false = lists:any(IsRightName, BridgesAfterDelete),
ok.
t_send_message_query(Config) ->
BridgeID = make_bridge(#{enable_batch => false}),
Key = 42,
Payload = #{key => Key, data => <<"clickhouse_data">>, timestamp => 10000},
%% This will use the SQL template included in the bridge
emqx_bridge:send_message(BridgeID, Payload),
%% Check that the data got to the database
check_key_in_clickhouse(Key, Config),
delete_bridge(),
ok.
t_send_simple_batch(Config) ->
send_simple_batch_helper(Config, #{}).
t_send_simple_batch_alternative_format(Config) ->
send_simple_batch_helper(
Config,
#{
sql => sql_insert_template_for_bridge_json(),
batch_value_separator => <<"">>
}
).
send_simple_batch_helper(Config, BridgeConfigExt) ->
BridgeConf = maps:merge(
#{
batch_size => 100,
enable_batch => true
},
BridgeConfigExt
),
BridgeID = make_bridge(BridgeConf),
Key = 42,
Payload = #{key => Key, data => <<"clickhouse_data">>, timestamp => 10000},
%% This will use the SQL template included in the bridge
emqx_bridge:send_message(BridgeID, Payload),
check_key_in_clickhouse(Key, Config),
delete_bridge(),
ok.
t_heavy_batching(Config) ->
heavy_batching_helper(Config, #{}).
t_heavy_batching_alternative_format(Config) ->
heavy_batching_helper(
Config,
#{
sql => sql_insert_template_for_bridge_json(),
batch_value_separator => <<"">>
}
).
heavy_batching_helper(Config, BridgeConfigExt) ->
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
NumberOfMessages = 10000,
BridgeConf = maps:merge(
#{
batch_size => 743,
batch_time_ms => 50,
enable_batch => true
},
BridgeConfigExt
),
BridgeID = make_bridge(BridgeConf),
SendMessageKey = fun(Key) ->
Payload = #{
key => Key,
data => <<"clickhouse_data">>,
timestamp => 10000
},
emqx_bridge:send_message(BridgeID, Payload)
end,
[SendMessageKey(Key) || Key <- lists:seq(1, NumberOfMessages)],
% Wait until the last message is in clickhouse
%% The delay between attempts is 100ms so 150 attempts means 15 seconds
check_key_in_clickhouse(_AttemptsToFindKey = 150, NumberOfMessages, Config),
%% In case the messages are not sent in order (could happend with multiple buffer workers)
timer:sleep(1000),
{ok, 200, ResultString1} = clickhouse:query(ClickhouseConnection, sql_find_all_keys(), []),
ResultString2 = iolist_to_binary(string:trim(ResultString1)),
KeyStrings = string:lexemes(ResultString2, "\n"),
Keys = [erlang:binary_to_integer(iolist_to_binary(K)) || K <- KeyStrings],
KeySet = maps:from_keys(Keys, true),
NumberOfMessages = maps:size(KeySet),
CheckKey = fun(Key) -> maps:get(Key, KeySet, false) end,
true = lists:all(CheckKey, lists:seq(1, NumberOfMessages)),
delete_bridge(),
ok.

View File

@ -1,2 +1,3 @@
toxiproxy
influxdb
clickhouse

View File

@ -0,0 +1,15 @@
emqx_ee_connector_clickhouse {
base_url {
desc {
en: """The HTTP URL to the Clickhouse server that you want to connect to (for example http://myhostname:8123)"""
zh: """你想连接到的Clickhouse服务器的HTTP URL例如http://myhostname:8123。"""
}
label: {
en: "URL to clickhouse server"
zh: "到clickhouse服务器的URL"
}
}
}

View File

@ -3,6 +3,7 @@
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
{tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}},
{clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.2"}}},
{emqx, {path, "../../apps/emqx"}}
]}.

View File

@ -9,7 +9,8 @@
influxdb,
tdengine,
wolff,
brod
brod,
clickhouse
]},
{env, []},
{modules, []},

View File

@ -0,0 +1,444 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_clickhouse).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource).
-import(hoconsc, [mk/2, enum/1, ref/2]).
%%=====================================================================
%% Exports
%%=====================================================================
%% Hocon config schema exports
-export([
roots/0,
fields/1,
values/1
]).
%% callbacks for behaviour emqx_resource
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2
]).
%% callbacks for ecpool
-export([connect/1]).
%% Internal exports used to execute code with ecpool worker
-export([
check_database_status/1,
execute_sql_in_clickhouse_server_using_connection/2
]).
%%=====================================================================
%% Types
%%=====================================================================
-type url() :: emqx_http_lib:uri_map().
-reflect_type([url/0]).
-typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
-type templates() ::
#{}
| #{
send_message_template := term(),
extend_send_message_template := term()
}.
-type state() ::
#{
templates := templates(),
poolname := atom()
}.
-type clickhouse_config() :: map().
%%=====================================================================
%% Configuration and default values
%%=====================================================================
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
[
{url,
hoconsc:mk(
url(),
#{
required => true,
validator => fun
(#{query := _Query}) ->
{error, "There must be no query in the url"};
(_) ->
ok
end,
desc => ?DESC("base_url")
}
)}
] ++ emqx_connector_schema_lib:relational_db_fields().
values(post) ->
maps:merge(values(put), #{name => <<"connector">>});
values(get) ->
values(post);
values(put) ->
#{
database => <<"mqtt">>,
enable => true,
pool_size => 8,
type => clickhouse,
url => <<"http://127.0.0.1:8123">>
};
values(_) ->
#{}.
%% ===================================================================
%% Callbacks defined in emqx_resource
%% ===================================================================
callback_mode() -> always_sync.
%% -------------------------------------------------------------------
%% on_start callback and related functions
%% -------------------------------------------------------------------
-spec on_start(resource_id(), clickhouse_config()) -> {ok, state()} | {error, _}.
on_start(
InstanceID,
#{
url := URL,
database := DB,
pool_size := PoolSize
} = Config
) ->
?SLOG(info, #{
msg => "starting_clickhouse_connector",
connector => InstanceID,
config => emqx_misc:redact(Config)
}),
PoolName = emqx_plugin_libs_pool:pool_name(InstanceID),
Options = [
{url, URL},
{user, maps:get(username, Config, "default")},
{key, emqx_secret:wrap(maps:get(password, Config, "public"))},
{database, DB},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize},
{pool, PoolName}
],
InitState = #{poolname => PoolName},
try
Templates = prepare_sql_templates(Config),
State = maps:merge(InitState, #{templates => Templates}),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of
ok ->
{ok, State};
{error, Reason} ->
log_start_error(Config, Reason, none),
{error, Reason}
end
catch
_:CatchReason:Stacktrace ->
log_start_error(Config, CatchReason, Stacktrace),
{error, CatchReason}
end.
log_start_error(Config, Reason, Stacktrace) ->
StacktraceMap =
case Stacktrace of
none -> #{};
_ -> #{stacktrace => Stacktrace}
end,
LogMessage =
#{
msg => "clickhouse_connector_start_failed",
error_reason => Reason,
config => emqx_misc:redact(Config)
},
?SLOG(info, maps:merge(LogMessage, StacktraceMap)),
?tp(
clickhouse_connector_start_failed,
#{error => Reason}
).
%% Helper functions to prepare SQL tempaltes
prepare_sql_templates(#{
sql := Template,
batch_value_separator := Separator
}) ->
InsertTemplate =
emqx_plugin_libs_rule:preproc_tmpl(Template),
BulkExtendInsertTemplate =
prepare_sql_bulk_extend_template(Template, Separator),
#{
send_message_template => InsertTemplate,
extend_send_message_template => BulkExtendInsertTemplate
};
prepare_sql_templates(_) ->
%% We don't create any templates if this is a non-bridge connector
#{}.
prepare_sql_bulk_extend_template(Template, Separator) ->
ValuesTemplate = split_clickhouse_insert_sql(Template),
%% The value part has been extracted
%% Add separator before ValuesTemplate so that one can append it
%% to an insert template
ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate).
%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can
%% also handle Clickhouse's SQL extension for INSERT statments that allows the
%% user to specify different formats:
%%
%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/
%%
split_clickhouse_insert_sql(SQL) ->
ErrorMsg = <<"The SQL template should be an SQL INSERT statement but it is something else.">>,
case
re:split(SQL, "(\\s+(?i:values)|(?i:format\\s+(?:[A-Za-z0-9_])+)\\s+)", [{return, binary}])
of
[Part1, _, Part3] ->
case string:trim(Part1, leading) of
<<"insert", _/binary>> ->
Part3;
<<"INSERT", _/binary>> ->
Part3;
_ ->
erlang:error(ErrorMsg)
end;
_ ->
erlang:error(ErrorMsg)
end.
% This is a callback for ecpool which is triggered by the call to
% emqx_plugin_libs_pool:start_pool in on_start/2
connect(Options) ->
URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))),
User = proplists:get_value(user, Options),
Database = proplists:get_value(database, Options),
Key = emqx_secret:unwrap(proplists:get_value(key, Options)),
Pool = proplists:get_value(pool, Options),
PoolSize = proplists:get_value(pool_size, Options),
FixedOptions = [
{url, URL},
{database, Database},
{user, User},
{key, Key},
{pool, Pool},
{pool_size, PoolSize}
],
case clickhouse:start_link(FixedOptions) of
{ok, _Conn} = Ok ->
Ok;
{error, Reason} ->
{error, Reason}
end.
%% -------------------------------------------------------------------
%% on_stop emqx_resouce callback
%% -------------------------------------------------------------------
-spec on_stop(resource_id(), resource_state()) -> term().
on_stop(ResourceID, #{poolname := PoolName}) ->
?SLOG(info, #{
msg => "stopping clickouse connector",
connector => ResourceID
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
%% -------------------------------------------------------------------
%% on_get_status emqx_resouce callback and related functions
%% -------------------------------------------------------------------
on_get_status(_ResourceID, #{poolname := Pool} = _State) ->
case
emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:check_database_status/1)
of
true ->
connected;
false ->
connecting
end.
check_database_status(Connection) ->
clickhouse:status(Connection).
%% -------------------------------------------------------------------
%% on_query emqx_resouce callback and related functions
%% -------------------------------------------------------------------
-spec on_query
(resource_id(), Request, resource_state()) -> query_result() when
Request :: {RequestType, Data},
RequestType :: send_message,
Data :: map();
(resource_id(), Request, resource_state()) -> query_result() when
Request :: {RequestType, SQL},
RequestType :: sql | query,
SQL :: binary().
on_query(
ResourceID,
{RequestType, DataOrSQL},
#{poolname := PoolName} = State
) ->
?SLOG(debug, #{
msg => "clickhouse connector received sql query",
connector => ResourceID,
type => RequestType,
sql => DataOrSQL,
state => State
}),
%% Have we got a query or data to fit into an SQL template?
SimplifiedRequestType = query_type(RequestType),
#{templates := Templates} = State,
SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL),
ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL),
transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
get_sql(send_message, #{send_message_template := PreparedSQL}, Data) ->
emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data);
get_sql(_, _, SQL) ->
SQL.
query_type(sql) ->
query;
query_type(query) ->
query;
%% Data that goes to bridges use the prepared template
query_type(send_message) ->
send_message.
%% -------------------------------------------------------------------
%% on_batch_query emqx_resouce callback and related functions
%% -------------------------------------------------------------------
-spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
BatchReq :: nonempty_list({'send_message', map()}).
on_batch_query(
ResourceID,
BatchReq,
State
) ->
%% Currently we only support batch requests with the send_message key
{Keys, ObjectsToInsert} = lists:unzip(BatchReq),
ensure_keys_are_of_type_send_message(Keys),
%% Pick out the SQL template
#{
templates := Templates,
poolname := PoolName
} = State,
%% Create batch insert SQL statement
SQL = objects_to_sql(ObjectsToInsert, Templates),
%% Do the actual query in the database
ResultFromClickhouse = execute_sql_in_clickhouse_server(PoolName, SQL),
%% Transform the result to a better format
transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL).
ensure_keys_are_of_type_send_message(Keys) ->
case lists:all(fun is_send_message_atom/1, Keys) of
true ->
ok;
false ->
erlang:error(
{unrecoverable_error,
<<"Unexpected type for batch message (Expected send_message)">>}
)
end.
is_send_message_atom(send_message) ->
true;
is_send_message_atom(_) ->
false.
objects_to_sql(
[FirstObject | RemainingObjects] = _ObjectsToInsert,
#{
send_message_template := InsertTemplate,
extend_send_message_template := BulkExtendInsertTemplate
}
) ->
%% Prepare INSERT-statement and the first row after VALUES
InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject),
FormatObjectDataFunction =
fun(Object) ->
emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object)
end,
InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects),
CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]),
CompleteStatement;
objects_to_sql(_, _) ->
erlang:error(<<"Templates for bulk insert missing.">>).
%% -------------------------------------------------------------------
%% Helper functions that are used by both on_query/3 and on_batch_query/3
%% -------------------------------------------------------------------
%% This function is used by on_query/3 and on_batch_query/3 to send a query to
%% the database server and receive a result
execute_sql_in_clickhouse_server(PoolName, SQL) ->
ecpool:pick_and_do(
PoolName,
{?MODULE, execute_sql_in_clickhouse_server_using_connection, [SQL]},
no_handover
).
execute_sql_in_clickhouse_server_using_connection(Connection, SQL) ->
clickhouse:query(Connection, SQL, []).
%% This function transforms the result received from clickhouse to something
%% that is a little bit more readable and creates approprieate log messages
transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) ->
snabbkaffe_log_return(ok),
ok;
transform_and_log_clickhouse_result({ok, 200, Data}, _, _) ->
Result = {ok, Data},
snabbkaffe_log_return(Result),
Result;
transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
?SLOG(error, #{
msg => "clickhouse connector do sql query failed",
connector => ResourceID,
sql => SQL,
reason => ClickhouseErrorResult
}),
{error, ClickhouseErrorResult}.
snabbkaffe_log_return(_Result) ->
?tp(
clickhouse_connector_query_return,
#{result => _Result}
).

View File

@ -0,0 +1,198 @@
% %%--------------------------------------------------------------------
% %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
% %%
% %% Licensed under the Apache License, Version 2.0 (the "License");
% %% you may not use this file except in compliance with the License.
% %% You may obtain a copy of the License at
% %% http://www.apache.org/licenses/LICENSE-2.0
% %%
% %% Unless required by applicable law or agreed to in writing, software
% %% distributed under the License is distributed on an "AS IS" BASIS,
% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% %% See the License for the specific language governing permissions and
% %% limitations under the License.
% %%--------------------------------------------------------------------
-module(ee_connector_clickhouse_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(CLICKHOUSE_HOST, "clickhouse").
-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse).
%% This test SUITE requires a running clickhouse instance. If you don't want to
%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
%% you can create a clickhouse instance with the following command (execute it
%% from root of the EMQX directory.). You also need to set ?CLICKHOUSE_HOST and
%% ?CLICKHOUSE_PORT to appropriate values.
%%
%% docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 -v "`pwd`/.ci/docker-compose-file/clickhouse/users.xml:/etc/clickhouse-server/users.xml" -v "`pwd`/.ci/docker-compose-file/clickhouse/config.xml:/etc/clickhouse-server/config.xml" clickhouse/clickhouse-server
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
clickhouse_url() ->
erlang:iolist_to_binary([
<<"http://">>,
?CLICKHOUSE_HOST,
":",
erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT)
]).
init_per_suite(Config) ->
case
emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT)
of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(emqx_ee_connector),
%% Create the db table
{ok, Conn} =
clickhouse:start_link([
{url, clickhouse_url()},
{user, <<"default">>},
{key, "public"},
{pool, tmp_pool}
]),
{ok, _, _} = clickhouse:query(Conn, <<"CREATE DATABASE IF NOT EXISTS mqtt">>, #{}),
clickhouse:stop(Conn),
Config;
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_clickhouse);
_ ->
{skip, no_clickhouse}
end
end.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
% %%------------------------------------------------------------------------------
% %% Testcases
% %%------------------------------------------------------------------------------
t_lifecycle(_Config) ->
perform_lifecycle_check(
<<"emqx_connector_clickhouse_SUITE">>,
clickhouse_config()
).
show(X) ->
erlang:display(X),
X.
show(Label, What) ->
erlang:display({Label, What}),
What.
perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?CLICKHOUSE_RESOURCE_MOD, InitialConfig),
{ok, #{
state := #{poolname := ReturnedPoolName} = State,
status := InitialStatus
}} =
emqx_resource:create_local(
PoolName,
?CONNECTOR_RESOURCE_GROUP,
?CLICKHOUSE_RESOURCE_MOD,
CheckedConfig,
#{}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
(fun() ->
erlang:display({pool_name, PoolName}),
QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()),
?assertMatch({ok, _}, QueryNoParamsResWrapper),
{_, QueryNoParamsRes} = QueryNoParamsResWrapper,
?assertMatch(<<"1">>, string:trim(QueryNoParamsRes))
end)(),
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
(fun() ->
QueryNoParamsResWrapper =
emqx_resource:query(PoolName, test_query_no_params()),
?assertMatch({ok, _}, QueryNoParamsResWrapper),
{_, QueryNoParamsRes} = QueryNoParamsResWrapper,
?assertMatch(<<"1">>, string:trim(QueryNoParamsRes))
end)(),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
% %%------------------------------------------------------------------------------
% %% Helpers
% %%------------------------------------------------------------------------------
clickhouse_config() ->
Config =
#{
auto_reconnect => true,
database => <<"mqtt">>,
username => <<"default">>,
password => <<"public">>,
pool_size => 8,
url => iolist_to_binary(
io_lib:format(
"http://~s:~b",
[
?CLICKHOUSE_HOST,
?CLICKHOUSE_DEFAULT_PORT
]
)
)
},
#{<<"config">> => Config}.
test_query_no_params() ->
{query, <<"SELECT 1">>}.

View File

@ -162,6 +162,9 @@ for dep in ${CT_DEPS}; do
tdengine)
FILES+=( '.ci/docker-compose-file/docker-compose-tdengine-restful.yaml' )
;;
clickhouse)
FILES+=( '.ci/docker-compose-file/docker-compose-clickhouse.yaml' )
;;
*)
echo "unknown_ct_dependency $dep"
exit 1