Merge pull request #9985 from kjellwinblad/kjell/feat/clickhouse_bridge_2/EMQX-8391
feat: add clickhouse database bridge
This commit is contained in:
commit
a638cc6566
|
@ -0,0 +1,678 @@
|
|||
<?xml version="1.0"?>
|
||||
<!--
|
||||
NOTE: User and query level settings are set up in "users.xml" file.
|
||||
If you have accidentially specified user-level settings here, server won't start.
|
||||
You can either move the settings to the right place inside "users.xml" file
|
||||
or add <skip_check_for_incorrect_settings>1</skip_check_for_incorrect_settings> here.
|
||||
-->
|
||||
<yandex>
|
||||
<logger>
|
||||
<!-- Possible levels: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105 -->
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>10</count>
|
||||
<!-- <console>1</console> --> <!-- Default behavior is autodetection (log to console if not daemon mode and is tty) -->
|
||||
|
||||
<!-- Per level overrides (legacy):
|
||||
|
||||
For example to suppress logging of the ConfigReloader you can use:
|
||||
NOTE: levels.logger is reserved, see below.
|
||||
-->
|
||||
<!--
|
||||
<levels>
|
||||
<ConfigReloader>none</ConfigReloader>
|
||||
</levels>
|
||||
-->
|
||||
|
||||
<!-- Per level overrides:
|
||||
|
||||
For example to suppress logging of the RBAC for default user you can use:
|
||||
(But please note that the logger name maybe changed from version to version, even after minor upgrade)
|
||||
-->
|
||||
<!--
|
||||
<levels>
|
||||
<logger>
|
||||
<name>ContextAccess (default)</name>
|
||||
<level>none</level>
|
||||
</logger>
|
||||
<logger>
|
||||
<name>DatabaseOrdinary (test)</name>
|
||||
<level>none</level>
|
||||
</logger>
|
||||
</levels>
|
||||
-->
|
||||
</logger>
|
||||
|
||||
<send_crash_reports>
|
||||
<!-- Changing <enabled> to true allows sending crash reports to -->
|
||||
<!-- the ClickHouse core developers team via Sentry https://sentry.io -->
|
||||
<!-- Doing so at least in pre-production environments is highly appreciated -->
|
||||
<enabled>false</enabled>
|
||||
<!-- Change <anonymize> to true if you don't feel comfortable attaching the server hostname to the crash report -->
|
||||
<anonymize>false</anonymize>
|
||||
<!-- Default endpoint should be changed to different Sentry DSN only if you have -->
|
||||
<!-- some in-house engineers or hired consultants who're going to debug ClickHouse issues for you -->
|
||||
<endpoint>https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277</endpoint>
|
||||
</send_crash_reports>
|
||||
|
||||
<!--display_name>production</display_name--> <!-- It is the name that will be shown in the client -->
|
||||
<http_port>8123</http_port>
|
||||
<tcp_port>9000</tcp_port>
|
||||
<mysql_port>9004</mysql_port>
|
||||
<!-- For HTTPS and SSL over native protocol. -->
|
||||
<!--
|
||||
<https_port>8443</https_port>
|
||||
<tcp_port_secure>9440</tcp_port_secure>
|
||||
-->
|
||||
<!-- Used with https_port and tcp_port_secure. Full ssl options list: https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h#L71 -->
|
||||
<openSSL>
|
||||
<server> <!-- Used for https server AND secure tcp port -->
|
||||
<!-- openssl req -subj "/CN=localhost" -new -newkey rsa:2048 -days 365 -nodes -x509 -keyout /etc/clickhouse-server/server.key -out /etc/clickhouse-server/server.crt -->
|
||||
<certificateFile>/etc/clickhouse-server/server.crt</certificateFile>
|
||||
<privateKeyFile>/etc/clickhouse-server/server.key</privateKeyFile>
|
||||
<!-- openssl dhparam -out /etc/clickhouse-server/dhparam.pem 4096 -->
|
||||
<dhParamsFile>/etc/clickhouse-server/dhparam.pem</dhParamsFile>
|
||||
<verificationMode>none</verificationMode>
|
||||
<loadDefaultCAFile>true</loadDefaultCAFile>
|
||||
<cacheSessions>true</cacheSessions>
|
||||
<disableProtocols>sslv2,sslv3</disableProtocols>
|
||||
<preferServerCiphers>true</preferServerCiphers>
|
||||
</server>
|
||||
|
||||
<client> <!-- Used for connecting to https dictionary source and secured Zookeeper communication -->
|
||||
<loadDefaultCAFile>true</loadDefaultCAFile>
|
||||
<cacheSessions>true</cacheSessions>
|
||||
<disableProtocols>sslv2,sslv3</disableProtocols>
|
||||
<preferServerCiphers>true</preferServerCiphers>
|
||||
<!-- Use for self-signed: <verificationMode>none</verificationMode> -->
|
||||
<invalidCertificateHandler>
|
||||
<!-- Use for self-signed: <name>AcceptCertificateHandler</name> -->
|
||||
<name>RejectCertificateHandler</name>
|
||||
</invalidCertificateHandler>
|
||||
</client>
|
||||
</openSSL>
|
||||
|
||||
<!-- Default root page on http[s] server. For example load UI from https://tabix.io/ when opening http://localhost:8123 -->
|
||||
<!--
|
||||
<http_server_default_response><![CDATA[<html ng-app="SMI2"><head><base href="http://ui.tabix.io/"></head><body><div ui-view="" class="content-ui"></div><script src="http://loader.tabix.io/master.js"></script></body></html>]]></http_server_default_response>
|
||||
-->
|
||||
|
||||
<!-- Port for communication between replicas. Used for data exchange. -->
|
||||
<interserver_http_port>9009</interserver_http_port>
|
||||
|
||||
<!-- Hostname that is used by other replicas to request this server.
|
||||
If not specified, than it is determined analoguous to 'hostname -f' command.
|
||||
This setting could be used to switch replication to another network interface.
|
||||
-->
|
||||
<!--
|
||||
<interserver_http_host>example.yandex.ru</interserver_http_host>
|
||||
-->
|
||||
|
||||
<!-- Listen specified host. use :: (wildcard IPv6 address), if you want to accept connections both with IPv4 and IPv6 from everywhere. -->
|
||||
<!-- <listen_host>::</listen_host> -->
|
||||
<!-- Same for hosts with disabled ipv6: -->
|
||||
<!-- <listen_host>0.0.0.0</listen_host> -->
|
||||
|
||||
<!-- Default values - try listen localhost on ipv4 and ipv6: -->
|
||||
<!--
|
||||
<listen_host>::1</listen_host>
|
||||
<listen_host>127.0.0.1</listen_host>
|
||||
-->
|
||||
<!-- Don't exit if ipv6 or ipv4 unavailable, but listen_host with this protocol specified -->
|
||||
<!-- <listen_try>0</listen_try> -->
|
||||
|
||||
<!-- Allow listen on same address:port -->
|
||||
<!-- <listen_reuse_port>0</listen_reuse_port> -->
|
||||
|
||||
<!-- <listen_backlog>64</listen_backlog> -->
|
||||
|
||||
<max_connections>4096</max_connections>
|
||||
<keep_alive_timeout>3</keep_alive_timeout>
|
||||
|
||||
<!-- Maximum number of concurrent queries. -->
|
||||
<max_concurrent_queries>100</max_concurrent_queries>
|
||||
|
||||
<!-- Maximum memory usage (resident set size) for server process.
|
||||
Zero value or unset means default. Default is "max_server_memory_usage_to_ram_ratio" of available physical RAM.
|
||||
If the value is larger than "max_server_memory_usage_to_ram_ratio" of available physical RAM, it will be cut down.
|
||||
|
||||
The constraint is checked on query execution time.
|
||||
If a query tries to allocate memory and the current memory usage plus allocation is greater
|
||||
than specified threshold, exception will be thrown.
|
||||
|
||||
It is not practical to set this constraint to small values like just a few gigabytes,
|
||||
because memory allocator will keep this amount of memory in caches and the server will deny service of queries.
|
||||
-->
|
||||
<max_server_memory_usage>0</max_server_memory_usage>
|
||||
|
||||
<!-- Maximum number of threads in the Global thread pool.
|
||||
This will default to a maximum of 10000 threads if not specified.
|
||||
This setting will be useful in scenarios where there are a large number
|
||||
of distributed queries that are running concurrently but are idling most
|
||||
of the time, in which case a higher number of threads might be required.
|
||||
-->
|
||||
|
||||
<max_thread_pool_size>10000</max_thread_pool_size>
|
||||
|
||||
<!-- On memory constrained environments you may have to set this to value larger than 1.
|
||||
-->
|
||||
<max_server_memory_usage_to_ram_ratio>10</max_server_memory_usage_to_ram_ratio>
|
||||
|
||||
<!-- Simple server-wide memory profiler. Collect a stack trace at every peak allocation step (in bytes).
|
||||
Data will be stored in system.trace_log table with query_id = empty string.
|
||||
Zero means disabled.
|
||||
-->
|
||||
<total_memory_profiler_step>4194304</total_memory_profiler_step>
|
||||
|
||||
<!-- Collect random allocations and deallocations and write them into system.trace_log with 'MemorySample' trace_type.
|
||||
The probability is for every alloc/free regardless to the size of the allocation.
|
||||
Note that sampling happens only when the amount of untracked memory exceeds the untracked memory limit,
|
||||
which is 4 MiB by default but can be lowered if 'total_memory_profiler_step' is lowered.
|
||||
You may want to set 'total_memory_profiler_step' to 1 for extra fine grained sampling.
|
||||
-->
|
||||
<total_memory_tracker_sample_probability>0</total_memory_tracker_sample_probability>
|
||||
|
||||
<!-- Set limit on number of open files (default: maximum). This setting makes sense on Mac OS X because getrlimit() fails to retrieve
|
||||
correct maximum value. -->
|
||||
<!-- <max_open_files>262144</max_open_files> -->
|
||||
|
||||
<!-- Size of cache of uncompressed blocks of data, used in tables of MergeTree family.
|
||||
In bytes. Cache is single for server. Memory is allocated only on demand.
|
||||
Cache is used when 'use_uncompressed_cache' user setting turned on (off by default).
|
||||
Uncompressed cache is advantageous only for very short queries and in rare cases.
|
||||
-->
|
||||
<uncompressed_cache_size>8589934592</uncompressed_cache_size>
|
||||
|
||||
<!-- Approximate size of mark cache, used in tables of MergeTree family.
|
||||
In bytes. Cache is single for server. Memory is allocated only on demand.
|
||||
You should not lower this value.
|
||||
-->
|
||||
<mark_cache_size>5368709120</mark_cache_size>
|
||||
|
||||
|
||||
<!-- Path to data directory, with trailing slash. -->
|
||||
<path>/var/lib/clickhouse/</path>
|
||||
|
||||
<!-- Path to temporary data for processing hard queries. -->
|
||||
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
|
||||
|
||||
<!-- Policy from the <storage_configuration> for the temporary files.
|
||||
If not set <tmp_path> is used, otherwise <tmp_path> is ignored.
|
||||
|
||||
Notes:
|
||||
- move_factor is ignored
|
||||
- keep_free_space_bytes is ignored
|
||||
- max_data_part_size_bytes is ignored
|
||||
- you must have exactly one volume in that policy
|
||||
-->
|
||||
<!-- <tmp_policy>tmp</tmp_policy> -->
|
||||
|
||||
<!-- Directory with user provided files that are accessible by 'file' table function. -->
|
||||
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
|
||||
|
||||
<!-- Path to folder where users and roles created by SQL commands are stored. -->
|
||||
<access_control_path>/var/lib/clickhouse/access/</access_control_path>
|
||||
|
||||
<!-- Path to configuration file with users, access rights, profiles of settings, quotas. -->
|
||||
<users_config>/etc/clickhouse-server/users.xml</users_config>
|
||||
|
||||
<!-- Default profile of settings. -->
|
||||
<default_profile>default</default_profile>
|
||||
|
||||
<!-- System profile of settings. This settings are used by internal processes (Buffer storage, Distibuted DDL worker and so on). -->
|
||||
<!-- <system_profile>default</system_profile> -->
|
||||
|
||||
<!-- Default database. -->
|
||||
<default_database>default</default_database>
|
||||
|
||||
<!-- Server time zone could be set here.
|
||||
|
||||
Time zone is used when converting between String and DateTime types,
|
||||
when printing DateTime in text formats and parsing DateTime from text,
|
||||
it is used in date and time related functions, if specific time zone was not passed as an argument.
|
||||
|
||||
Time zone is specified as identifier from IANA time zone database, like UTC or Africa/Abidjan.
|
||||
If not specified, system time zone at server startup is used.
|
||||
|
||||
Please note, that server could display time zone alias instead of specified name.
|
||||
Example: W-SU is an alias for Europe/Moscow and Zulu is an alias for UTC.
|
||||
-->
|
||||
<!-- <timezone>Europe/Moscow</timezone> -->
|
||||
|
||||
<!-- You can specify umask here (see "man umask"). Server will apply it on startup.
|
||||
Number is always parsed as octal. Default umask is 027 (other users cannot read logs, data files, etc; group can only read).
|
||||
-->
|
||||
<!-- <umask>022</umask> -->
|
||||
|
||||
<!-- Perform mlockall after startup to lower first queries latency
|
||||
and to prevent clickhouse executable from being paged out under high IO load.
|
||||
Enabling this option is recommended but will lead to increased startup time for up to a few seconds.
|
||||
-->
|
||||
<mlock_executable>true</mlock_executable>
|
||||
|
||||
<!-- Configuration of clusters that could be used in Distributed tables.
|
||||
https://clickhouse.tech/docs/en/operations/table_engines/distributed/
|
||||
-->
|
||||
<remote_servers incl="clickhouse_remote_servers" >
|
||||
<!-- Test only shard config for testing distributed storage -->
|
||||
<test_shard_localhost>
|
||||
<shard>
|
||||
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
|
||||
<!-- <internal_replication>false</internal_replication> -->
|
||||
<!-- Optional. Shard weight when writing data. Default: 1. -->
|
||||
<!-- <weight>1</weight> -->
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
<!-- Optional. Priority of the replica for load_balancing. Default: 1 (less value has more priority). -->
|
||||
<!-- <priority>1</priority> -->
|
||||
</replica>
|
||||
</shard>
|
||||
</test_shard_localhost>
|
||||
<test_cluster_two_shards_localhost>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster_two_shards_localhost>
|
||||
<test_cluster_two_shards>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>127.0.0.1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>127.0.0.2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster_two_shards>
|
||||
<test_shard_localhost_secure>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>9440</port>
|
||||
<secure>1</secure>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_shard_localhost_secure>
|
||||
<test_unavailable_shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>1</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_unavailable_shard>
|
||||
</remote_servers>
|
||||
|
||||
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.
|
||||
If this section is not present in configuration, all hosts are allowed.
|
||||
-->
|
||||
<remote_url_allow_hosts>
|
||||
<!-- Host should be specified exactly as in URL. The name is checked before DNS resolution.
|
||||
Example: "yandex.ru", "yandex.ru." and "www.yandex.ru" are different hosts.
|
||||
If port is explicitly specified in URL, the host:port is checked as a whole.
|
||||
If host specified here without port, any port with this host allowed.
|
||||
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
|
||||
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
|
||||
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
|
||||
-->
|
||||
|
||||
<!-- Regular expression can be specified. RE2 engine is used for regexps.
|
||||
Regexps are not aligned: don't forget to add ^ and $. Also don't forget to escape dot (.) metacharacter
|
||||
(forgetting to do so is a common source of error).
|
||||
-->
|
||||
</remote_url_allow_hosts>
|
||||
|
||||
<!-- If element has 'incl' attribute, then for it's value will be used corresponding substitution from another file.
|
||||
By default, path to file with substitutions is /etc/metrika.xml. It could be changed in config in 'include_from' element.
|
||||
Values for substitutions are specified in /yandex/name_of_substitution elements in that file.
|
||||
-->
|
||||
|
||||
<!-- ZooKeeper is used to store metadata about replicas, when using Replicated tables.
|
||||
Optional. If you don't use replicated tables, you could omit that.
|
||||
|
||||
See https://clickhouse.yandex/docs/en/table_engines/replication/
|
||||
-->
|
||||
|
||||
<zookeeper incl="zookeeper-servers" optional="true" />
|
||||
|
||||
<!-- Substitutions for parameters of replicated tables.
|
||||
Optional. If you don't use replicated tables, you could omit that.
|
||||
|
||||
See https://clickhouse.yandex/docs/en/table_engines/replication/#creating-replicated-tables
|
||||
-->
|
||||
<macros incl="macros" optional="true" />
|
||||
|
||||
|
||||
<!-- Reloading interval for embedded dictionaries, in seconds. Default: 3600. -->
|
||||
<builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval>
|
||||
|
||||
|
||||
<!-- Maximum session timeout, in seconds. Default: 3600. -->
|
||||
<max_session_timeout>3600</max_session_timeout>
|
||||
|
||||
<!-- Default session timeout, in seconds. Default: 60. -->
|
||||
<default_session_timeout>60</default_session_timeout>
|
||||
|
||||
<!-- Sending data to Graphite for monitoring. Several sections can be defined. -->
|
||||
<!--
|
||||
interval - send every X second
|
||||
root_path - prefix for keys
|
||||
hostname_in_path - append hostname to root_path (default = true)
|
||||
metrics - send data from table system.metrics
|
||||
events - send data from table system.events
|
||||
asynchronous_metrics - send data from table system.asynchronous_metrics
|
||||
-->
|
||||
<!--
|
||||
<graphite>
|
||||
<host>localhost</host>
|
||||
<port>42000</port>
|
||||
<timeout>0.1</timeout>
|
||||
<interval>60</interval>
|
||||
<root_path>one_min</root_path>
|
||||
<hostname_in_path>true</hostname_in_path>
|
||||
|
||||
<metrics>true</metrics>
|
||||
<events>true</events>
|
||||
<events_cumulative>false</events_cumulative>
|
||||
<asynchronous_metrics>true</asynchronous_metrics>
|
||||
</graphite>
|
||||
<graphite>
|
||||
<host>localhost</host>
|
||||
<port>42000</port>
|
||||
<timeout>0.1</timeout>
|
||||
<interval>1</interval>
|
||||
<root_path>one_sec</root_path>
|
||||
|
||||
<metrics>true</metrics>
|
||||
<events>true</events>
|
||||
<events_cumulative>false</events_cumulative>
|
||||
<asynchronous_metrics>false</asynchronous_metrics>
|
||||
</graphite>
|
||||
-->
|
||||
|
||||
<!-- Serve endpoint fot Prometheus monitoring. -->
|
||||
<!--
|
||||
endpoint - mertics path (relative to root, statring with "/")
|
||||
port - port to setup server. If not defined or 0 than http_port used
|
||||
metrics - send data from table system.metrics
|
||||
events - send data from table system.events
|
||||
asynchronous_metrics - send data from table system.asynchronous_metrics
|
||||
status_info - send data from different component from CH, ex: Dictionaries status
|
||||
-->
|
||||
<!--
|
||||
<prometheus>
|
||||
<endpoint>/metrics</endpoint>
|
||||
<port>9363</port>
|
||||
|
||||
<metrics>true</metrics>
|
||||
<events>true</events>
|
||||
<asynchronous_metrics>true</asynchronous_metrics>
|
||||
<status_info>true</status_info>
|
||||
</prometheus>
|
||||
-->
|
||||
|
||||
<!-- Query log. Used only for queries with setting log_queries = 1. -->
|
||||
<query_log>
|
||||
<!-- What table to insert data. If table is not exist, it will be created.
|
||||
When query log structure is changed after system update,
|
||||
then old table will be renamed and new table will be created automatically.
|
||||
-->
|
||||
<database>system</database>
|
||||
<table>query_log</table>
|
||||
<!--
|
||||
PARTITION BY expr https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
|
||||
Example:
|
||||
event_date
|
||||
toMonday(event_date)
|
||||
toYYYYMM(event_date)
|
||||
toStartOfHour(event_time)
|
||||
-->
|
||||
<partition_by>toYYYYMM(event_date)</partition_by>
|
||||
|
||||
<!-- Instead of partition_by, you can provide full engine expression (starting with ENGINE = ) with parameters,
|
||||
Example: <engine>ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024</engine>
|
||||
-->
|
||||
|
||||
<!-- Interval of flushing data. -->
|
||||
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
|
||||
</query_log>
|
||||
|
||||
<!-- Trace log. Stores stack traces collected by query profilers.
|
||||
See query_profiler_real_time_period_ns and query_profiler_cpu_time_period_ns settings. -->
|
||||
<trace_log>
|
||||
<database>system</database>
|
||||
<table>trace_log</table>
|
||||
|
||||
<partition_by>toYYYYMM(event_date)</partition_by>
|
||||
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
|
||||
</trace_log>
|
||||
|
||||
<!-- Query thread log. Has information about all threads participated in query execution.
|
||||
Used only for queries with setting log_query_threads = 1. -->
|
||||
<query_thread_log>
|
||||
<database>system</database>
|
||||
<table>query_thread_log</table>
|
||||
<partition_by>toYYYYMM(event_date)</partition_by>
|
||||
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
|
||||
</query_thread_log>
|
||||
|
||||
<!-- Uncomment if use part log.
|
||||
Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).
|
||||
<part_log>
|
||||
<database>system</database>
|
||||
<table>part_log</table>
|
||||
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
|
||||
</part_log>
|
||||
-->
|
||||
|
||||
<!-- Uncomment to write text log into table.
|
||||
Text log contains all information from usual server log but stores it in structured and efficient way.
|
||||
The level of the messages that goes to the table can be limited (<level>), if not specified all messages will go to the table.
|
||||
<text_log>
|
||||
<database>system</database>
|
||||
<table>text_log</table>
|
||||
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
|
||||
<level></level>
|
||||
</text_log>
|
||||
-->
|
||||
|
||||
<!-- Metric log contains rows with current values of ProfileEvents, CurrentMetrics collected with "collect_interval_milliseconds" interval. -->
|
||||
<metric_log>
|
||||
<database>system</database>
|
||||
<table>metric_log</table>
|
||||
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
|
||||
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
|
||||
</metric_log>
|
||||
|
||||
<!--
|
||||
Asynchronous metric log contains values of metrics from
|
||||
system.asynchronous_metrics.
|
||||
-->
|
||||
<asynchronous_metric_log>
|
||||
<database>system</database>
|
||||
<table>asynchronous_metric_log</table>
|
||||
<!--
|
||||
Asynchronous metrics are updated once a minute, so there is
|
||||
no need to flush more often.
|
||||
-->
|
||||
<flush_interval_milliseconds>60000</flush_interval_milliseconds>
|
||||
</asynchronous_metric_log>
|
||||
|
||||
<!-- Parameters for embedded dictionaries, used in Yandex.Metrica.
|
||||
See https://clickhouse.yandex/docs/en/dicts/internal_dicts/
|
||||
-->
|
||||
|
||||
<!-- Path to file with region hierarchy. -->
|
||||
<!-- <path_to_regions_hierarchy_file>/opt/geo/regions_hierarchy.txt</path_to_regions_hierarchy_file> -->
|
||||
|
||||
<!-- Path to directory with files containing names of regions -->
|
||||
<!-- <path_to_regions_names_files>/opt/geo/</path_to_regions_names_files> -->
|
||||
|
||||
|
||||
<!-- Configuration of external dictionaries. See:
|
||||
https://clickhouse.yandex/docs/en/dicts/external_dicts/
|
||||
-->
|
||||
<dictionaries_config>*_dictionary.xml</dictionaries_config>
|
||||
|
||||
<!-- Uncomment if you want data to be compressed 30-100% better.
|
||||
Don't do that if you just started using ClickHouse.
|
||||
-->
|
||||
<compression incl="clickhouse_compression">
|
||||
<!--
|
||||
<!- - Set of variants. Checked in order. Last matching case wins. If nothing matches, lz4 will be used. - ->
|
||||
<case>
|
||||
|
||||
<!- - Conditions. All must be satisfied. Some conditions may be omitted. - ->
|
||||
<min_part_size>10000000000</min_part_size> <!- - Min part size in bytes. - ->
|
||||
<min_part_size_ratio>0.01</min_part_size_ratio> <!- - Min size of part relative to whole table size. - ->
|
||||
|
||||
<!- - What compression method to use. - ->
|
||||
<method>zstd</method>
|
||||
</case>
|
||||
-->
|
||||
</compression>
|
||||
|
||||
<!-- Allow to execute distributed DDL queries (CREATE, DROP, ALTER, RENAME) on cluster.
|
||||
Works only if ZooKeeper is enabled. Comment it if such functionality isn't required. -->
|
||||
<distributed_ddl>
|
||||
<!-- Path in ZooKeeper to queue with DDL queries -->
|
||||
<path>/clickhouse/task_queue/ddl</path>
|
||||
|
||||
<!-- Settings from this profile will be used to execute DDL queries -->
|
||||
<!-- <profile>default</profile> -->
|
||||
</distributed_ddl>
|
||||
|
||||
<!-- Settings to fine tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->
|
||||
<!--
|
||||
<merge_tree>
|
||||
<max_suspicious_broken_parts>5</max_suspicious_broken_parts>
|
||||
</merge_tree>
|
||||
-->
|
||||
|
||||
<!-- Protection from accidental DROP.
|
||||
If size of a MergeTree table is greater than max_table_size_to_drop (in bytes) than table could not be dropped with any DROP query.
|
||||
If you want do delete one table and don't want to change clickhouse-server config, you could create special file <clickhouse-path>/flags/force_drop_table and make DROP once.
|
||||
By default max_table_size_to_drop is 50GB; max_table_size_to_drop=0 allows to DROP any tables.
|
||||
The same for max_partition_size_to_drop.
|
||||
Uncomment to disable protection.
|
||||
-->
|
||||
<!-- <max_table_size_to_drop>0</max_table_size_to_drop> -->
|
||||
<!-- <max_partition_size_to_drop>0</max_partition_size_to_drop> -->
|
||||
|
||||
<!-- Example of parameters for GraphiteMergeTree table engine -->
|
||||
<graphite_rollup_example>
|
||||
<pattern>
|
||||
<regexp>click_cost</regexp>
|
||||
<function>any</function>
|
||||
<retention>
|
||||
<age>0</age>
|
||||
<precision>3600</precision>
|
||||
</retention>
|
||||
<retention>
|
||||
<age>86400</age>
|
||||
<precision>60</precision>
|
||||
</retention>
|
||||
</pattern>
|
||||
<default>
|
||||
<function>max</function>
|
||||
<retention>
|
||||
<age>0</age>
|
||||
<precision>60</precision>
|
||||
</retention>
|
||||
<retention>
|
||||
<age>3600</age>
|
||||
<precision>300</precision>
|
||||
</retention>
|
||||
<retention>
|
||||
<age>86400</age>
|
||||
<precision>3600</precision>
|
||||
</retention>
|
||||
</default>
|
||||
</graphite_rollup_example>
|
||||
|
||||
<!-- Directory in <clickhouse-path> containing schema files for various input formats.
|
||||
The directory will be created if it doesn't exist.
|
||||
-->
|
||||
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
|
||||
|
||||
<!-- Uncomment to use query masking rules.
|
||||
name - name for the rule (optional)
|
||||
regexp - RE2 compatible regular expression (mandatory)
|
||||
replace - substitution string for sensitive data (optional, by default - six asterisks)
|
||||
<query_masking_rules>
|
||||
<rule>
|
||||
<name>hide SSN</name>
|
||||
<regexp>\b\d{3}-\d{2}-\d{4}\b</regexp>
|
||||
<replace>000-00-0000</replace>
|
||||
</rule>
|
||||
</query_masking_rules>
|
||||
-->
|
||||
|
||||
<!-- Uncomment to use custom http handlers.
|
||||
rules are checked from top to bottom, first match runs the handler
|
||||
url - to match request URL, you can use 'regex:' prefix to use regex match(optional)
|
||||
methods - to match request method, you can use commas to separate multiple method matches(optional)
|
||||
headers - to match request headers, match each child element(child element name is header name), you can use 'regex:' prefix to use regex match(optional)
|
||||
handler is request handler
|
||||
type - supported types: static, dynamic_query_handler, predefined_query_handler
|
||||
query - use with predefined_query_handler type, executes query when the handler is called
|
||||
query_param_name - use with dynamic_query_handler type, extracts and executes the value corresponding to the <query_param_name> value in HTTP request params
|
||||
status - use with static type, response status code
|
||||
content_type - use with static type, response content-type
|
||||
response_content - use with static type, Response content sent to client, when using the prefix 'file://' or 'config://', find the content from the file or configuration send to client.
|
||||
|
||||
<http_handlers>
|
||||
<rule>
|
||||
<url>/</url>
|
||||
<methods>POST,GET</methods>
|
||||
<headers><pragma>no-cache</pragma></headers>
|
||||
<handler>
|
||||
<type>dynamic_query_handler</type>
|
||||
<query_param_name>query</query_param_name>
|
||||
</handler>
|
||||
</rule>
|
||||
|
||||
<rule>
|
||||
<url>/predefined_query</url>
|
||||
<methods>POST,GET</methods>
|
||||
<handler>
|
||||
<type>predefined_query_handler</type>
|
||||
<query>SELECT * FROM system.settings</query>
|
||||
</handler>
|
||||
</rule>
|
||||
|
||||
<rule>
|
||||
<handler>
|
||||
<type>static</type>
|
||||
<status>200</status>
|
||||
<content_type>text/plain; charset=UTF-8</content_type>
|
||||
<response_content>config://http_server_default_response</response_content>
|
||||
</handler>
|
||||
</rule>
|
||||
</http_handlers>
|
||||
-->
|
||||
|
||||
<!-- Uncomment to disable ClickHouse internal DNS caching. -->
|
||||
<!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> -->
|
||||
</yandex>
|
|
@ -0,0 +1,110 @@
|
|||
<?xml version="1.0"?>
|
||||
<yandex>
|
||||
<!-- Profiles of settings. -->
|
||||
<profiles>
|
||||
<!-- Default settings. -->
|
||||
<default>
|
||||
<!-- Maximum memory usage for processing single query, in bytes. -->
|
||||
<max_memory_usage>10000000000</max_memory_usage>
|
||||
|
||||
<!-- Use cache of uncompressed blocks of data. Meaningful only for processing many of very short queries. -->
|
||||
<use_uncompressed_cache>0</use_uncompressed_cache>
|
||||
|
||||
<!-- How to choose between replicas during distributed query processing.
|
||||
random - choose random replica from set of replicas with minimum number of errors
|
||||
nearest_hostname - from set of replicas with minimum number of errors, choose replica
|
||||
with minimum number of different symbols between replica's hostname and local hostname
|
||||
(Hamming distance).
|
||||
in_order - first live replica is chosen in specified order.
|
||||
first_or_random - if first replica one has higher number of errors, pick a random one from replicas with minimum number of errors.
|
||||
-->
|
||||
<load_balancing>random</load_balancing>
|
||||
</default>
|
||||
|
||||
<!-- Profile that allows only read queries. -->
|
||||
<readonly>
|
||||
<readonly>1</readonly>
|
||||
</readonly>
|
||||
</profiles>
|
||||
|
||||
<!-- Users and ACL. -->
|
||||
<users>
|
||||
<!-- If user name was not specified, 'default' user is used. -->
|
||||
<default>
|
||||
<!-- Password could be specified in plaintext or in SHA256 (in hex format).
|
||||
|
||||
If you want to specify password in plaintext (not recommended), place it in 'password' element.
|
||||
Example: <password>qwerty</password>.
|
||||
Password could be empty.
|
||||
|
||||
If you want to specify SHA256, place it in 'password_sha256_hex' element.
|
||||
Example: <password_sha256_hex>65e84be33532fb784c48129675f9eff3a682b27168c0ea744b2cf58ee02337c5</password_sha256_hex>
|
||||
Restrictions of SHA256: impossibility to connect to ClickHouse using MySQL JS client (as of July 2019).
|
||||
|
||||
If you want to specify double SHA1, place it in 'password_double_sha1_hex' element.
|
||||
Example: <password_double_sha1_hex>e395796d6546b1b65db9d665cd43f0e858dd4303</password_double_sha1_hex>
|
||||
|
||||
How to generate decent password:
|
||||
Execute: PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | sha256sum | tr -d '-'
|
||||
In first line will be password and in second - corresponding SHA256.
|
||||
|
||||
How to generate double SHA1:
|
||||
Execute: PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | sha1sum | tr -d '-' | xxd -r -p | sha1sum | tr -d '-'
|
||||
In first line will be password and in second - corresponding double SHA1.
|
||||
-->
|
||||
<password>public</password>
|
||||
|
||||
<!-- List of networks with open access.
|
||||
|
||||
To open access from everywhere, specify:
|
||||
<ip>::/0</ip>
|
||||
|
||||
To open access only from localhost, specify:
|
||||
<ip>::1</ip>
|
||||
<ip>127.0.0.1</ip>
|
||||
|
||||
Each element of list has one of the following forms:
|
||||
<ip> IP-address or network mask. Examples: 213.180.204.3 or 10.0.0.1/8 or 10.0.0.1/255.255.255.0
|
||||
2a02:6b8::3 or 2a02:6b8::3/64 or 2a02:6b8::3/ffff:ffff:ffff:ffff::.
|
||||
<host> Hostname. Example: server01.yandex.ru.
|
||||
To check access, DNS query is performed, and all received addresses compared to peer address.
|
||||
<host_regexp> Regular expression for host names. Example, ^server\d\d-\d\d-\d\.yandex\.ru$
|
||||
To check access, DNS PTR query is performed for peer address and then regexp is applied.
|
||||
Then, for result of PTR query, another DNS query is performed and all received addresses compared to peer address.
|
||||
Strongly recommended that regexp is ends with $
|
||||
All results of DNS requests are cached till server restart.
|
||||
-->
|
||||
<networks incl="networks" replace="replace">
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
|
||||
<!-- Settings profile for user. -->
|
||||
<profile>default</profile>
|
||||
|
||||
<!-- Quota for user. -->
|
||||
<quota>default</quota>
|
||||
|
||||
<!-- User can create other users and grant rights to them. -->
|
||||
<!-- <access_management>1</access_management> -->
|
||||
</default>
|
||||
</users>
|
||||
|
||||
<!-- Quotas. -->
|
||||
<quotas>
|
||||
<!-- Name of quota. -->
|
||||
<default>
|
||||
<!-- Limits for time interval. You could specify many intervals with different limits. -->
|
||||
<interval>
|
||||
<!-- Length of interval. -->
|
||||
<duration>3600</duration>
|
||||
|
||||
<!-- No limits. Just calculate resource usage for time interval. -->
|
||||
<queries>0</queries>
|
||||
<errors>0</errors>
|
||||
<result_rows>0</result_rows>
|
||||
<read_rows>0</read_rows>
|
||||
<execution_time>0</execution_time>
|
||||
</interval>
|
||||
</default>
|
||||
</quotas>
|
||||
</yandex>
|
|
@ -0,0 +1,16 @@
|
|||
version: '3.9'
|
||||
|
||||
services:
|
||||
clickhouse:
|
||||
container_name: clickhouse
|
||||
image: clickhouse/clickhouse-server:23.1.2.9-alpine
|
||||
restart: always
|
||||
volumes:
|
||||
- ./clickhouse/users.xml:/etc/clickhouse-server/users.xml
|
||||
- ./clickhouse/config.xml:/etc/clickhouse-server/config.d/config.xml
|
||||
expose:
|
||||
- "8123"
|
||||
ports:
|
||||
- "8123:8123"
|
||||
networks:
|
||||
- emqx_bridge
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_bridge, [
|
||||
{description, "EMQX bridges"},
|
||||
{vsn, "0.1.11"},
|
||||
{vsn, "0.1.12"},
|
||||
{registered, []},
|
||||
{mod, {emqx_bridge_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -57,7 +57,8 @@
|
|||
T == influxdb_api_v2;
|
||||
T == redis_single;
|
||||
T == redis_sentinel;
|
||||
T == redis_cluster
|
||||
T == redis_cluster;
|
||||
T == clickhouse
|
||||
).
|
||||
|
||||
load() ->
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
-define(MONGO_DEFAULT_PORT, 27017).
|
||||
-define(REDIS_DEFAULT_PORT, 6379).
|
||||
-define(PGSQL_DEFAULT_PORT, 5432).
|
||||
-define(CLICKHOUSE_DEFAULT_PORT, 8123).
|
||||
|
||||
-define(AUTO_RECONNECT_INTERVAL, 2).
|
||||
|
||||
|
|
|
@ -917,6 +917,15 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
|
|||
%% return `{error, {recoverable_error, Reason}}`
|
||||
EXPR
|
||||
catch
|
||||
%% For convenience and to make the code in the callbacks cleaner an
|
||||
%% error exception with the two following formats are translated to the
|
||||
%% corresponding return values. The receiver of the return values
|
||||
%% recognizes these special return formats and use them to decided if a
|
||||
%% request should be retried.
|
||||
error:{unrecoverable_error, Msg} ->
|
||||
{error, {unrecoverable_error, Msg}};
|
||||
error:{recoverable_error, Msg} ->
|
||||
{error, {recoverable_error, Msg}};
|
||||
ERR:REASON:STACKTRACE ->
|
||||
?RESOURCE_ERROR(exception, #{
|
||||
name => NAME,
|
||||
|
|
|
@ -8,3 +8,4 @@ redis
|
|||
redis_cluster
|
||||
pgsql
|
||||
tdengine
|
||||
clickhouse
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
emqx_ee_bridge_clickhouse {
|
||||
|
||||
local_topic {
|
||||
desc {
|
||||
en: """The MQTT topic filter to be forwarded to Clickhouse. All MQTT 'PUBLISH' messages with the topic
|
||||
matching the local_topic will be forwarded.</br>
|
||||
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
|
||||
configured, then both the data got from the rule and the MQTT messages that match local_topic
|
||||
will be forwarded.
|
||||
"""
|
||||
zh: """发送到 'local_topic' 的消息都会转发到 Clickhouse。 </br>
|
||||
注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
|
||||
"""
|
||||
}
|
||||
label {
|
||||
en: "Local Topic"
|
||||
zh: "本地 Topic"
|
||||
}
|
||||
}
|
||||
sql_template {
|
||||
desc {
|
||||
en: """SQL Template. The template string can contain placeholders
|
||||
for message metadata and payload field. The placeholders are inserted
|
||||
without any checking and special formatting, so it is important to
|
||||
ensure that the inserted values are formatted and escaped correctly."""
|
||||
zh:
|
||||
"""SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符
|
||||
的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符
|
||||
模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入
|
||||
所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符
|
||||
模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入
|
||||
所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。"""
|
||||
}
|
||||
label {
|
||||
en: "SQL Template"
|
||||
zh: "SQL 模板"
|
||||
}
|
||||
}
|
||||
batch_value_separator {
|
||||
desc {
|
||||
en: """The bridge repeats what comes after the VALUES or FORMAT FormatType in the
|
||||
SQL template to form a batch request. The value specified with
|
||||
this parameter will be inserted between the values. The default
|
||||
value ',' works for the VALUES format, but other values
|
||||
might be needed if you specify some other format with the
|
||||
clickhouse FORMAT syntax.
|
||||
|
||||
See https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ and
|
||||
https://clickhouse.com/docs/en/interfaces/formats#formats for more information about
|
||||
the format syntax and the available formats."""
|
||||
zh: """桥接会重复 VALUES 或 FORMAT 格式类型之后的内容。中 VALUES 或
|
||||
FORMAT FormatType 后面的内容,以形成一个批处理请求。用这个参数指定的值
|
||||
这个参数指定的值将被插入到这些值之间。默认的
|
||||
默认值','适用于VALUES格式,但是如果你指定了其他的格式,可能需要其他的值。可能需要其他值,如果你用
|
||||
"clickhouse FORMAT "语法指定其他格式。语法指定其他格式。
|
||||
|
||||
参见https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ 和
|
||||
https://clickhouse.com/docs/en/interfaces/formats#formats 了解更多关于
|
||||
格式语法和可用的格式。"""
|
||||
}
|
||||
label {
|
||||
en: "Batch Value Separator"
|
||||
zh: "批量值分离器"
|
||||
}
|
||||
}
|
||||
config_enable {
|
||||
desc {
|
||||
en: """Enable or disable this bridge"""
|
||||
zh: """启用/禁用桥接"""
|
||||
}
|
||||
label {
|
||||
en: "Enable Or Disable Bridge"
|
||||
zh: "启用/禁用桥接"
|
||||
}
|
||||
}
|
||||
|
||||
desc_config {
|
||||
desc {
|
||||
en: """Configuration for a Clickhouse bridge."""
|
||||
zh: """Clickhouse 桥接配置"""
|
||||
}
|
||||
label: {
|
||||
en: "Clickhouse Bridge Configuration"
|
||||
zh: "Clickhouse 桥接配置"
|
||||
}
|
||||
}
|
||||
|
||||
desc_type {
|
||||
desc {
|
||||
en: """The Bridge Type"""
|
||||
zh: """Bridge 类型"""
|
||||
}
|
||||
label {
|
||||
en: "Bridge Type"
|
||||
zh: "桥接类型"
|
||||
}
|
||||
}
|
||||
|
||||
desc_name {
|
||||
desc {
|
||||
en: """Bridge name."""
|
||||
zh: """桥接名字"""
|
||||
}
|
||||
label {
|
||||
en: "Bridge Name"
|
||||
zh: "桥接名字"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,7 +29,8 @@ api_schemas(Method) ->
|
|||
ref(emqx_ee_bridge_redis, Method ++ "_cluster"),
|
||||
ref(emqx_ee_bridge_timescale, Method),
|
||||
ref(emqx_ee_bridge_matrix, Method),
|
||||
ref(emqx_ee_bridge_tdengine, Method)
|
||||
ref(emqx_ee_bridge_tdengine, Method),
|
||||
ref(emqx_ee_bridge_clickhouse, Method)
|
||||
].
|
||||
|
||||
schema_modules() ->
|
||||
|
@ -44,7 +45,8 @@ schema_modules() ->
|
|||
emqx_ee_bridge_pgsql,
|
||||
emqx_ee_bridge_timescale,
|
||||
emqx_ee_bridge_matrix,
|
||||
emqx_ee_bridge_tdengine
|
||||
emqx_ee_bridge_tdengine,
|
||||
emqx_ee_bridge_clickhouse
|
||||
].
|
||||
|
||||
examples(Method) ->
|
||||
|
@ -75,7 +77,8 @@ resource_type(redis_cluster) -> emqx_ee_connector_redis;
|
|||
resource_type(pgsql) -> emqx_connector_pgsql;
|
||||
resource_type(timescale) -> emqx_connector_pgsql;
|
||||
resource_type(matrix) -> emqx_connector_pgsql;
|
||||
resource_type(tdengine) -> emqx_ee_connector_tdengine.
|
||||
resource_type(tdengine) -> emqx_ee_connector_tdengine;
|
||||
resource_type(clickhouse) -> emqx_ee_connector_clickhouse.
|
||||
|
||||
fields(bridges) ->
|
||||
[
|
||||
|
@ -119,7 +122,8 @@ fields(bridges) ->
|
|||
required => false
|
||||
}
|
||||
)}
|
||||
] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs().
|
||||
] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++
|
||||
clickhouse_structs().
|
||||
|
||||
mongodb_structs() ->
|
||||
[
|
||||
|
@ -183,3 +187,15 @@ pgsql_structs() ->
|
|||
{matrix, <<"Matrix">>}
|
||||
]
|
||||
].
|
||||
|
||||
clickhouse_structs() ->
|
||||
[
|
||||
{clickhouse,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_ee_bridge_clickhouse, "config")),
|
||||
#{
|
||||
desc => <<"Clickhouse Bridge Config">>,
|
||||
required => false
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
|
|
@ -0,0 +1,143 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ee_bridge_clickhouse).
|
||||
|
||||
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
-define(DEFAULT_SQL,
|
||||
<<"INSERT INTO mqtt_test(payload, arrived) VALUES ('${payload}', ${timestamp})">>
|
||||
).
|
||||
|
||||
-define(DEFAULT_BATCH_VALUE_SEPARATOR, <<", ">>).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Callback used by HTTP API
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"clickhouse">> => #{
|
||||
summary => <<"Clickhouse Bridge">>,
|
||||
value => values(Method, "clickhouse")
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
values(get, Type) ->
|
||||
maps:merge(values(post, Type), ?METRICS_EXAMPLE);
|
||||
values(post, Type) ->
|
||||
#{
|
||||
enable => true,
|
||||
type => Type,
|
||||
name => <<"foo">>,
|
||||
server => <<"127.0.0.1:8123">>,
|
||||
database => <<"mqtt">>,
|
||||
pool_size => 8,
|
||||
username => <<"default">>,
|
||||
password => <<"public">>,
|
||||
sql => ?DEFAULT_SQL,
|
||||
batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR,
|
||||
local_topic => <<"local/topic/#">>,
|
||||
resource_opts => #{
|
||||
worker_pool_size => 8,
|
||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||
batch_time => ?DEFAULT_BATCH_TIME,
|
||||
query_mode => async,
|
||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
||||
}
|
||||
};
|
||||
values(put, Type) ->
|
||||
values(post, Type).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Hocon Schema Definitions
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "bridge_clickhouse".
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields("config") ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{sql,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||
)},
|
||||
{batch_value_separator,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR}
|
||||
)},
|
||||
{local_topic,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("local_topic"), default => undefined}
|
||||
)},
|
||||
{resource_opts,
|
||||
mk(
|
||||
ref(?MODULE, "creation_opts"),
|
||||
#{
|
||||
required => false,
|
||||
default => #{},
|
||||
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
|
||||
}
|
||||
)}
|
||||
] ++
|
||||
emqx_ee_connector_clickhouse:fields(config);
|
||||
fields("creation_opts") ->
|
||||
Opts = emqx_resource_schema:fields("creation_opts"),
|
||||
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
|
||||
fields("post") ->
|
||||
fields("post", clickhouse);
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||
|
||||
fields("post", Type) ->
|
||||
[type_field(Type), name_field() | fields("config")].
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for Clickhouse using `", string:to_upper(Method), "` method."];
|
||||
desc("creation_opts" = Name) ->
|
||||
emqx_resource_schema:desc(Name);
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% internal
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
is_hidden_opts(Field) ->
|
||||
lists:member(Field, [
|
||||
async_inflight_window
|
||||
]).
|
||||
|
||||
type_field(Type) ->
|
||||
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||
|
||||
name_field() ->
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
|
@ -0,0 +1,325 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ee_bridge_clickhouse_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-define(CLICKHOUSE_HOST, "clickhouse").
|
||||
-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse).
|
||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||
|
||||
%% See comment in
|
||||
%% lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl for how to
|
||||
%% run this without bringing up the whole CI infrastucture
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Common Test Setup, Teardown and Testcase List
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init_per_suite(Config) ->
|
||||
case
|
||||
emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT)
|
||||
of
|
||||
true ->
|
||||
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
{ok, _} = application:ensure_all_started(emqx_ee_connector),
|
||||
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
|
||||
snabbkaffe:fix_ct_logging(),
|
||||
%% Create the db table
|
||||
Conn = start_clickhouse_connection(),
|
||||
% erlang:monitor,sb
|
||||
{ok, _, _} = clickhouse:query(Conn, sql_create_database(), #{}),
|
||||
{ok, _, _} = clickhouse:query(Conn, sql_create_table(), []),
|
||||
clickhouse:query(Conn, sql_find_key(42), []),
|
||||
[{clickhouse_connection, Conn} | Config];
|
||||
false ->
|
||||
case os:getenv("IS_CI") of
|
||||
"yes" ->
|
||||
throw(no_clickhouse);
|
||||
_ ->
|
||||
{skip, no_clickhouse}
|
||||
end
|
||||
end.
|
||||
|
||||
start_clickhouse_connection() ->
|
||||
%% Start clickhouse connector in sub process so that it does not go
|
||||
%% down with the process that is calling init_per_suite
|
||||
InitPerSuiteProcess = self(),
|
||||
erlang:spawn(
|
||||
fun() ->
|
||||
{ok, Conn} =
|
||||
clickhouse:start_link([
|
||||
{url, clickhouse_url()},
|
||||
{user, <<"default">>},
|
||||
{key, "public"},
|
||||
{pool, tmp_pool}
|
||||
]),
|
||||
InitPerSuiteProcess ! {clickhouse_connection, Conn},
|
||||
Ref = erlang:monitor(process, Conn),
|
||||
receive
|
||||
{'DOWN', Ref, process, _, _} ->
|
||||
erlang:display(helper_down),
|
||||
ok
|
||||
end
|
||||
end
|
||||
),
|
||||
receive
|
||||
{clickhouse_connection, C} -> C
|
||||
end.
|
||||
|
||||
end_per_suite(Config) ->
|
||||
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
|
||||
clickhouse:stop(ClickhouseConnection),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||
_ = application:stop(emqx_connector),
|
||||
_ = application:stop(emqx_ee_connector),
|
||||
_ = application:stop(emqx_bridge).
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
reset_table(Config),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, Config) ->
|
||||
reset_table(Config),
|
||||
ok.
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper functions for test cases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
sql_insert_template_for_bridge() ->
|
||||
"INSERT INTO mqtt_test(key, data, arrived) VALUES "
|
||||
"(${key}, '${data}', ${timestamp})".
|
||||
|
||||
sql_insert_template_for_bridge_json() ->
|
||||
"INSERT INTO mqtt_test(key, data, arrived) FORMAT JSONCompactEachRow "
|
||||
"[${key}, \\\"${data}\\\", ${timestamp}]".
|
||||
|
||||
sql_create_table() ->
|
||||
"CREATE TABLE IF NOT EXISTS mqtt.mqtt_test (key BIGINT, data String, arrived BIGINT) ENGINE = Memory".
|
||||
|
||||
sql_find_key(Key) ->
|
||||
io_lib:format("SELECT key FROM mqtt.mqtt_test WHERE key = ~p", [Key]).
|
||||
|
||||
sql_find_all_keys() ->
|
||||
"SELECT key FROM mqtt.mqtt_test".
|
||||
|
||||
sql_drop_table() ->
|
||||
"DROP TABLE IF EXISTS mqtt.mqtt_test".
|
||||
|
||||
sql_create_database() ->
|
||||
"CREATE DATABASE IF NOT EXISTS mqtt".
|
||||
|
||||
clickhouse_url() ->
|
||||
erlang:iolist_to_binary([
|
||||
<<"http://">>,
|
||||
?CLICKHOUSE_HOST,
|
||||
":",
|
||||
erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT)
|
||||
]).
|
||||
|
||||
clickhouse_config(Config) ->
|
||||
SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
|
||||
BatchSeparator = maps:get(batch_value_separator, Config, <<", ">>),
|
||||
BatchSize = maps:get(batch_size, Config, 1),
|
||||
BatchTime = maps:get(batch_time_ms, Config, 0),
|
||||
EnableBatch = maps:get(enable_batch, Config, true),
|
||||
Name = atom_to_binary(?MODULE),
|
||||
URL = clickhouse_url(),
|
||||
ConfigString =
|
||||
io_lib:format(
|
||||
"bridges.clickhouse.~s {\n"
|
||||
" enable = true\n"
|
||||
" url = \"~s\"\n"
|
||||
" database = \"mqtt\"\n"
|
||||
" sql = \"~s\"\n"
|
||||
" batch_value_separator = \"~s\""
|
||||
" resource_opts = {\n"
|
||||
" enable_batch = ~w\n"
|
||||
" batch_size = ~b\n"
|
||||
" batch_time = ~bms\n"
|
||||
" }\n"
|
||||
"}\n",
|
||||
[
|
||||
Name,
|
||||
URL,
|
||||
SQL,
|
||||
BatchSeparator,
|
||||
EnableBatch,
|
||||
BatchSize,
|
||||
BatchTime
|
||||
]
|
||||
),
|
||||
ct:pal(ConfigString),
|
||||
parse_and_check(ConfigString, <<"clickhouse">>, Name).
|
||||
|
||||
parse_and_check(ConfigString, BridgeType, Name) ->
|
||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
|
||||
RetConfig.
|
||||
|
||||
make_bridge(Config) ->
|
||||
Type = <<"clickhouse">>,
|
||||
Name = atom_to_binary(?MODULE),
|
||||
BridgeConfig = clickhouse_config(Config),
|
||||
{ok, _} = emqx_bridge:create(
|
||||
Type,
|
||||
Name,
|
||||
BridgeConfig
|
||||
),
|
||||
emqx_bridge_resource:bridge_id(Type, Name).
|
||||
|
||||
delete_bridge() ->
|
||||
Type = <<"clickhouse">>,
|
||||
Name = atom_to_binary(?MODULE),
|
||||
{ok, _} = emqx_bridge:remove(Type, Name),
|
||||
ok.
|
||||
|
||||
reset_table(Config) ->
|
||||
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
|
||||
{ok, _, _} = clickhouse:query(ClickhouseConnection, sql_drop_table(), []),
|
||||
{ok, _, _} = clickhouse:query(ClickhouseConnection, sql_create_table(), []),
|
||||
ok.
|
||||
|
||||
check_key_in_clickhouse(AttempsLeft, Key, Config) ->
|
||||
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
|
||||
check_key_in_clickhouse(AttempsLeft, Key, none, ClickhouseConnection).
|
||||
|
||||
check_key_in_clickhouse(Key, Config) ->
|
||||
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
|
||||
check_key_in_clickhouse(30, Key, none, ClickhouseConnection).
|
||||
|
||||
check_key_in_clickhouse(0, Key, PrevResult, _) ->
|
||||
ct:fail("Expected ~p in database but got ~s", [Key, PrevResult]);
|
||||
check_key_in_clickhouse(AttempsLeft, Key, _, ClickhouseConnection) ->
|
||||
{ok, 200, ResultString} = clickhouse:query(ClickhouseConnection, sql_find_key(Key), []),
|
||||
Expected = erlang:integer_to_binary(Key),
|
||||
case iolist_to_binary(string:trim(ResultString)) of
|
||||
Expected ->
|
||||
ok;
|
||||
SomethingElse ->
|
||||
timer:sleep(100),
|
||||
check_key_in_clickhouse(AttempsLeft - 1, Key, SomethingElse, ClickhouseConnection)
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Test Cases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_make_delete_bridge(_Config) ->
|
||||
make_bridge(#{}),
|
||||
%% Check that the new brige is in the list of bridges
|
||||
Bridges = emqx_bridge:list(),
|
||||
Name = atom_to_binary(?MODULE),
|
||||
IsRightName =
|
||||
fun
|
||||
(#{name := BName}) when BName =:= Name ->
|
||||
true;
|
||||
(_) ->
|
||||
false
|
||||
end,
|
||||
true = lists:any(IsRightName, Bridges),
|
||||
delete_bridge(),
|
||||
BridgesAfterDelete = emqx_bridge:list(),
|
||||
false = lists:any(IsRightName, BridgesAfterDelete),
|
||||
ok.
|
||||
|
||||
t_send_message_query(Config) ->
|
||||
BridgeID = make_bridge(#{enable_batch => false}),
|
||||
Key = 42,
|
||||
Payload = #{key => Key, data => <<"clickhouse_data">>, timestamp => 10000},
|
||||
%% This will use the SQL template included in the bridge
|
||||
emqx_bridge:send_message(BridgeID, Payload),
|
||||
%% Check that the data got to the database
|
||||
check_key_in_clickhouse(Key, Config),
|
||||
delete_bridge(),
|
||||
ok.
|
||||
|
||||
t_send_simple_batch(Config) ->
|
||||
send_simple_batch_helper(Config, #{}).
|
||||
|
||||
t_send_simple_batch_alternative_format(Config) ->
|
||||
send_simple_batch_helper(
|
||||
Config,
|
||||
#{
|
||||
sql => sql_insert_template_for_bridge_json(),
|
||||
batch_value_separator => <<"">>
|
||||
}
|
||||
).
|
||||
|
||||
send_simple_batch_helper(Config, BridgeConfigExt) ->
|
||||
BridgeConf = maps:merge(
|
||||
#{
|
||||
batch_size => 100,
|
||||
enable_batch => true
|
||||
},
|
||||
BridgeConfigExt
|
||||
),
|
||||
BridgeID = make_bridge(BridgeConf),
|
||||
Key = 42,
|
||||
Payload = #{key => Key, data => <<"clickhouse_data">>, timestamp => 10000},
|
||||
%% This will use the SQL template included in the bridge
|
||||
emqx_bridge:send_message(BridgeID, Payload),
|
||||
check_key_in_clickhouse(Key, Config),
|
||||
delete_bridge(),
|
||||
ok.
|
||||
|
||||
t_heavy_batching(Config) ->
|
||||
heavy_batching_helper(Config, #{}).
|
||||
|
||||
t_heavy_batching_alternative_format(Config) ->
|
||||
heavy_batching_helper(
|
||||
Config,
|
||||
#{
|
||||
sql => sql_insert_template_for_bridge_json(),
|
||||
batch_value_separator => <<"">>
|
||||
}
|
||||
).
|
||||
|
||||
heavy_batching_helper(Config, BridgeConfigExt) ->
|
||||
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
|
||||
NumberOfMessages = 10000,
|
||||
BridgeConf = maps:merge(
|
||||
#{
|
||||
batch_size => 743,
|
||||
batch_time_ms => 50,
|
||||
enable_batch => true
|
||||
},
|
||||
BridgeConfigExt
|
||||
),
|
||||
BridgeID = make_bridge(BridgeConf),
|
||||
SendMessageKey = fun(Key) ->
|
||||
Payload = #{
|
||||
key => Key,
|
||||
data => <<"clickhouse_data">>,
|
||||
timestamp => 10000
|
||||
},
|
||||
emqx_bridge:send_message(BridgeID, Payload)
|
||||
end,
|
||||
[SendMessageKey(Key) || Key <- lists:seq(1, NumberOfMessages)],
|
||||
% Wait until the last message is in clickhouse
|
||||
%% The delay between attempts is 100ms so 150 attempts means 15 seconds
|
||||
check_key_in_clickhouse(_AttemptsToFindKey = 150, NumberOfMessages, Config),
|
||||
%% In case the messages are not sent in order (could happend with multiple buffer workers)
|
||||
timer:sleep(1000),
|
||||
{ok, 200, ResultString1} = clickhouse:query(ClickhouseConnection, sql_find_all_keys(), []),
|
||||
ResultString2 = iolist_to_binary(string:trim(ResultString1)),
|
||||
KeyStrings = string:lexemes(ResultString2, "\n"),
|
||||
Keys = [erlang:binary_to_integer(iolist_to_binary(K)) || K <- KeyStrings],
|
||||
KeySet = maps:from_keys(Keys, true),
|
||||
NumberOfMessages = maps:size(KeySet),
|
||||
CheckKey = fun(Key) -> maps:get(Key, KeySet, false) end,
|
||||
true = lists:all(CheckKey, lists:seq(1, NumberOfMessages)),
|
||||
delete_bridge(),
|
||||
ok.
|
|
@ -1,2 +1,3 @@
|
|||
toxiproxy
|
||||
influxdb
|
||||
clickhouse
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
|
||||
emqx_ee_connector_clickhouse {
|
||||
|
||||
base_url {
|
||||
desc {
|
||||
en: """The HTTP URL to the Clickhouse server that you want to connect to (for example http://myhostname:8123)"""
|
||||
zh: """你想连接到的Clickhouse服务器的HTTP URL(例如http://myhostname:8123)。"""
|
||||
}
|
||||
label: {
|
||||
en: "URL to clickhouse server"
|
||||
zh: "到clickhouse服务器的URL"
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -3,6 +3,7 @@
|
|||
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
|
||||
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
|
||||
{tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}},
|
||||
{clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.2"}}},
|
||||
{emqx, {path, "../../apps/emqx"}}
|
||||
]}.
|
||||
|
||||
|
|
|
@ -9,7 +9,8 @@
|
|||
influxdb,
|
||||
tdengine,
|
||||
wolff,
|
||||
brod
|
||||
brod,
|
||||
clickhouse
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
|
|
@ -0,0 +1,444 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ee_connector_clickhouse).
|
||||
|
||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
%%=====================================================================
|
||||
%% Exports
|
||||
%%=====================================================================
|
||||
|
||||
%% Hocon config schema exports
|
||||
-export([
|
||||
roots/0,
|
||||
fields/1,
|
||||
values/1
|
||||
]).
|
||||
|
||||
%% callbacks for behaviour emqx_resource
|
||||
-export([
|
||||
callback_mode/0,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
]).
|
||||
|
||||
%% callbacks for ecpool
|
||||
-export([connect/1]).
|
||||
|
||||
%% Internal exports used to execute code with ecpool worker
|
||||
-export([
|
||||
check_database_status/1,
|
||||
execute_sql_in_clickhouse_server_using_connection/2
|
||||
]).
|
||||
|
||||
%%=====================================================================
|
||||
%% Types
|
||||
%%=====================================================================
|
||||
|
||||
-type url() :: emqx_http_lib:uri_map().
|
||||
-reflect_type([url/0]).
|
||||
-typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
|
||||
|
||||
-type templates() ::
|
||||
#{}
|
||||
| #{
|
||||
send_message_template := term(),
|
||||
extend_send_message_template := term()
|
||||
}.
|
||||
|
||||
-type state() ::
|
||||
#{
|
||||
templates := templates(),
|
||||
poolname := atom()
|
||||
}.
|
||||
|
||||
-type clickhouse_config() :: map().
|
||||
|
||||
%%=====================================================================
|
||||
%% Configuration and default values
|
||||
%%=====================================================================
|
||||
|
||||
roots() ->
|
||||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||
|
||||
fields(config) ->
|
||||
[
|
||||
{url,
|
||||
hoconsc:mk(
|
||||
url(),
|
||||
#{
|
||||
required => true,
|
||||
validator => fun
|
||||
(#{query := _Query}) ->
|
||||
{error, "There must be no query in the url"};
|
||||
(_) ->
|
||||
ok
|
||||
end,
|
||||
desc => ?DESC("base_url")
|
||||
}
|
||||
)}
|
||||
] ++ emqx_connector_schema_lib:relational_db_fields().
|
||||
|
||||
values(post) ->
|
||||
maps:merge(values(put), #{name => <<"connector">>});
|
||||
values(get) ->
|
||||
values(post);
|
||||
values(put) ->
|
||||
#{
|
||||
database => <<"mqtt">>,
|
||||
enable => true,
|
||||
pool_size => 8,
|
||||
type => clickhouse,
|
||||
url => <<"http://127.0.0.1:8123">>
|
||||
};
|
||||
values(_) ->
|
||||
#{}.
|
||||
|
||||
%% ===================================================================
|
||||
%% Callbacks defined in emqx_resource
|
||||
%% ===================================================================
|
||||
|
||||
callback_mode() -> always_sync.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% on_start callback and related functions
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-spec on_start(resource_id(), clickhouse_config()) -> {ok, state()} | {error, _}.
|
||||
|
||||
on_start(
|
||||
InstanceID,
|
||||
#{
|
||||
url := URL,
|
||||
database := DB,
|
||||
pool_size := PoolSize
|
||||
} = Config
|
||||
) ->
|
||||
?SLOG(info, #{
|
||||
msg => "starting_clickhouse_connector",
|
||||
connector => InstanceID,
|
||||
config => emqx_misc:redact(Config)
|
||||
}),
|
||||
PoolName = emqx_plugin_libs_pool:pool_name(InstanceID),
|
||||
Options = [
|
||||
{url, URL},
|
||||
{user, maps:get(username, Config, "default")},
|
||||
{key, emqx_secret:wrap(maps:get(password, Config, "public"))},
|
||||
{database, DB},
|
||||
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
|
||||
{pool_size, PoolSize},
|
||||
{pool, PoolName}
|
||||
],
|
||||
InitState = #{poolname => PoolName},
|
||||
try
|
||||
Templates = prepare_sql_templates(Config),
|
||||
State = maps:merge(InitState, #{templates => Templates}),
|
||||
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
log_start_error(Config, Reason, none),
|
||||
{error, Reason}
|
||||
end
|
||||
catch
|
||||
_:CatchReason:Stacktrace ->
|
||||
log_start_error(Config, CatchReason, Stacktrace),
|
||||
{error, CatchReason}
|
||||
end.
|
||||
|
||||
log_start_error(Config, Reason, Stacktrace) ->
|
||||
StacktraceMap =
|
||||
case Stacktrace of
|
||||
none -> #{};
|
||||
_ -> #{stacktrace => Stacktrace}
|
||||
end,
|
||||
LogMessage =
|
||||
#{
|
||||
msg => "clickhouse_connector_start_failed",
|
||||
error_reason => Reason,
|
||||
config => emqx_misc:redact(Config)
|
||||
},
|
||||
?SLOG(info, maps:merge(LogMessage, StacktraceMap)),
|
||||
?tp(
|
||||
clickhouse_connector_start_failed,
|
||||
#{error => Reason}
|
||||
).
|
||||
|
||||
%% Helper functions to prepare SQL tempaltes
|
||||
|
||||
prepare_sql_templates(#{
|
||||
sql := Template,
|
||||
batch_value_separator := Separator
|
||||
}) ->
|
||||
InsertTemplate =
|
||||
emqx_plugin_libs_rule:preproc_tmpl(Template),
|
||||
BulkExtendInsertTemplate =
|
||||
prepare_sql_bulk_extend_template(Template, Separator),
|
||||
#{
|
||||
send_message_template => InsertTemplate,
|
||||
extend_send_message_template => BulkExtendInsertTemplate
|
||||
};
|
||||
prepare_sql_templates(_) ->
|
||||
%% We don't create any templates if this is a non-bridge connector
|
||||
#{}.
|
||||
|
||||
prepare_sql_bulk_extend_template(Template, Separator) ->
|
||||
ValuesTemplate = split_clickhouse_insert_sql(Template),
|
||||
%% The value part has been extracted
|
||||
%% Add separator before ValuesTemplate so that one can append it
|
||||
%% to an insert template
|
||||
ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
|
||||
emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate).
|
||||
|
||||
%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can
|
||||
%% also handle Clickhouse's SQL extension for INSERT statments that allows the
|
||||
%% user to specify different formats:
|
||||
%%
|
||||
%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/
|
||||
%%
|
||||
split_clickhouse_insert_sql(SQL) ->
|
||||
ErrorMsg = <<"The SQL template should be an SQL INSERT statement but it is something else.">>,
|
||||
case
|
||||
re:split(SQL, "(\\s+(?i:values)|(?i:format\\s+(?:[A-Za-z0-9_])+)\\s+)", [{return, binary}])
|
||||
of
|
||||
[Part1, _, Part3] ->
|
||||
case string:trim(Part1, leading) of
|
||||
<<"insert", _/binary>> ->
|
||||
Part3;
|
||||
<<"INSERT", _/binary>> ->
|
||||
Part3;
|
||||
_ ->
|
||||
erlang:error(ErrorMsg)
|
||||
end;
|
||||
_ ->
|
||||
erlang:error(ErrorMsg)
|
||||
end.
|
||||
|
||||
% This is a callback for ecpool which is triggered by the call to
|
||||
% emqx_plugin_libs_pool:start_pool in on_start/2
|
||||
|
||||
connect(Options) ->
|
||||
URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))),
|
||||
User = proplists:get_value(user, Options),
|
||||
Database = proplists:get_value(database, Options),
|
||||
Key = emqx_secret:unwrap(proplists:get_value(key, Options)),
|
||||
Pool = proplists:get_value(pool, Options),
|
||||
PoolSize = proplists:get_value(pool_size, Options),
|
||||
FixedOptions = [
|
||||
{url, URL},
|
||||
{database, Database},
|
||||
{user, User},
|
||||
{key, Key},
|
||||
{pool, Pool},
|
||||
{pool_size, PoolSize}
|
||||
],
|
||||
case clickhouse:start_link(FixedOptions) of
|
||||
{ok, _Conn} = Ok ->
|
||||
Ok;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% on_stop emqx_resouce callback
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-spec on_stop(resource_id(), resource_state()) -> term().
|
||||
|
||||
on_stop(ResourceID, #{poolname := PoolName}) ->
|
||||
?SLOG(info, #{
|
||||
msg => "stopping clickouse connector",
|
||||
connector => ResourceID
|
||||
}),
|
||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% on_get_status emqx_resouce callback and related functions
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
on_get_status(_ResourceID, #{poolname := Pool} = _State) ->
|
||||
case
|
||||
emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:check_database_status/1)
|
||||
of
|
||||
true ->
|
||||
connected;
|
||||
false ->
|
||||
connecting
|
||||
end.
|
||||
|
||||
check_database_status(Connection) ->
|
||||
clickhouse:status(Connection).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% on_query emqx_resouce callback and related functions
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-spec on_query
|
||||
(resource_id(), Request, resource_state()) -> query_result() when
|
||||
Request :: {RequestType, Data},
|
||||
RequestType :: send_message,
|
||||
Data :: map();
|
||||
(resource_id(), Request, resource_state()) -> query_result() when
|
||||
Request :: {RequestType, SQL},
|
||||
RequestType :: sql | query,
|
||||
SQL :: binary().
|
||||
|
||||
on_query(
|
||||
ResourceID,
|
||||
{RequestType, DataOrSQL},
|
||||
#{poolname := PoolName} = State
|
||||
) ->
|
||||
?SLOG(debug, #{
|
||||
msg => "clickhouse connector received sql query",
|
||||
connector => ResourceID,
|
||||
type => RequestType,
|
||||
sql => DataOrSQL,
|
||||
state => State
|
||||
}),
|
||||
%% Have we got a query or data to fit into an SQL template?
|
||||
SimplifiedRequestType = query_type(RequestType),
|
||||
#{templates := Templates} = State,
|
||||
SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL),
|
||||
ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL),
|
||||
transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
|
||||
|
||||
get_sql(send_message, #{send_message_template := PreparedSQL}, Data) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data);
|
||||
get_sql(_, _, SQL) ->
|
||||
SQL.
|
||||
|
||||
query_type(sql) ->
|
||||
query;
|
||||
query_type(query) ->
|
||||
query;
|
||||
%% Data that goes to bridges use the prepared template
|
||||
query_type(send_message) ->
|
||||
send_message.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% on_batch_query emqx_resouce callback and related functions
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
|
||||
BatchReq :: nonempty_list({'send_message', map()}).
|
||||
|
||||
on_batch_query(
|
||||
ResourceID,
|
||||
BatchReq,
|
||||
State
|
||||
) ->
|
||||
%% Currently we only support batch requests with the send_message key
|
||||
{Keys, ObjectsToInsert} = lists:unzip(BatchReq),
|
||||
ensure_keys_are_of_type_send_message(Keys),
|
||||
%% Pick out the SQL template
|
||||
#{
|
||||
templates := Templates,
|
||||
poolname := PoolName
|
||||
} = State,
|
||||
%% Create batch insert SQL statement
|
||||
SQL = objects_to_sql(ObjectsToInsert, Templates),
|
||||
%% Do the actual query in the database
|
||||
ResultFromClickhouse = execute_sql_in_clickhouse_server(PoolName, SQL),
|
||||
%% Transform the result to a better format
|
||||
transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL).
|
||||
|
||||
ensure_keys_are_of_type_send_message(Keys) ->
|
||||
case lists:all(fun is_send_message_atom/1, Keys) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
erlang:error(
|
||||
{unrecoverable_error,
|
||||
<<"Unexpected type for batch message (Expected send_message)">>}
|
||||
)
|
||||
end.
|
||||
|
||||
is_send_message_atom(send_message) ->
|
||||
true;
|
||||
is_send_message_atom(_) ->
|
||||
false.
|
||||
|
||||
objects_to_sql(
|
||||
[FirstObject | RemainingObjects] = _ObjectsToInsert,
|
||||
#{
|
||||
send_message_template := InsertTemplate,
|
||||
extend_send_message_template := BulkExtendInsertTemplate
|
||||
}
|
||||
) ->
|
||||
%% Prepare INSERT-statement and the first row after VALUES
|
||||
InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject),
|
||||
FormatObjectDataFunction =
|
||||
fun(Object) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object)
|
||||
end,
|
||||
InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects),
|
||||
CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]),
|
||||
CompleteStatement;
|
||||
objects_to_sql(_, _) ->
|
||||
erlang:error(<<"Templates for bulk insert missing.">>).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Helper functions that are used by both on_query/3 and on_batch_query/3
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
%% This function is used by on_query/3 and on_batch_query/3 to send a query to
|
||||
%% the database server and receive a result
|
||||
execute_sql_in_clickhouse_server(PoolName, SQL) ->
|
||||
ecpool:pick_and_do(
|
||||
PoolName,
|
||||
{?MODULE, execute_sql_in_clickhouse_server_using_connection, [SQL]},
|
||||
no_handover
|
||||
).
|
||||
|
||||
execute_sql_in_clickhouse_server_using_connection(Connection, SQL) ->
|
||||
clickhouse:query(Connection, SQL, []).
|
||||
|
||||
%% This function transforms the result received from clickhouse to something
|
||||
%% that is a little bit more readable and creates approprieate log messages
|
||||
transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) ->
|
||||
snabbkaffe_log_return(ok),
|
||||
ok;
|
||||
transform_and_log_clickhouse_result({ok, 200, Data}, _, _) ->
|
||||
Result = {ok, Data},
|
||||
snabbkaffe_log_return(Result),
|
||||
Result;
|
||||
transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
|
||||
?SLOG(error, #{
|
||||
msg => "clickhouse connector do sql query failed",
|
||||
connector => ResourceID,
|
||||
sql => SQL,
|
||||
reason => ClickhouseErrorResult
|
||||
}),
|
||||
{error, ClickhouseErrorResult}.
|
||||
|
||||
snabbkaffe_log_return(_Result) ->
|
||||
?tp(
|
||||
clickhouse_connector_query_return,
|
||||
#{result => _Result}
|
||||
).
|
|
@ -0,0 +1,198 @@
|
|||
% %%--------------------------------------------------------------------
|
||||
% %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
% %%
|
||||
% %% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
% %% you may not use this file except in compliance with the License.
|
||||
% %% You may obtain a copy of the License at
|
||||
% %% http://www.apache.org/licenses/LICENSE-2.0
|
||||
% %%
|
||||
% %% Unless required by applicable law or agreed to in writing, software
|
||||
% %% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
% %% See the License for the specific language governing permissions and
|
||||
% %% limitations under the License.
|
||||
% %%--------------------------------------------------------------------
|
||||
|
||||
-module(ee_connector_clickhouse_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include("emqx_connector.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
-define(CLICKHOUSE_HOST, "clickhouse").
|
||||
-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse).
|
||||
|
||||
%% This test SUITE requires a running clickhouse instance. If you don't want to
|
||||
%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
|
||||
%% you can create a clickhouse instance with the following command (execute it
|
||||
%% from root of the EMQX directory.). You also need to set ?CLICKHOUSE_HOST and
|
||||
%% ?CLICKHOUSE_PORT to appropriate values.
|
||||
%%
|
||||
%% docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 -v "`pwd`/.ci/docker-compose-file/clickhouse/users.xml:/etc/clickhouse-server/users.xml" -v "`pwd`/.ci/docker-compose-file/clickhouse/config.xml:/etc/clickhouse-server/config.xml" clickhouse/clickhouse-server
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
groups() ->
|
||||
[].
|
||||
|
||||
clickhouse_url() ->
|
||||
erlang:iolist_to_binary([
|
||||
<<"http://">>,
|
||||
?CLICKHOUSE_HOST,
|
||||
":",
|
||||
erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT)
|
||||
]).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
case
|
||||
emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT)
|
||||
of
|
||||
true ->
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
{ok, _} = application:ensure_all_started(emqx_ee_connector),
|
||||
%% Create the db table
|
||||
{ok, Conn} =
|
||||
clickhouse:start_link([
|
||||
{url, clickhouse_url()},
|
||||
{user, <<"default">>},
|
||||
{key, "public"},
|
||||
{pool, tmp_pool}
|
||||
]),
|
||||
{ok, _, _} = clickhouse:query(Conn, <<"CREATE DATABASE IF NOT EXISTS mqtt">>, #{}),
|
||||
clickhouse:stop(Conn),
|
||||
Config;
|
||||
false ->
|
||||
case os:getenv("IS_CI") of
|
||||
"yes" ->
|
||||
throw(no_clickhouse);
|
||||
_ ->
|
||||
{skip, no_clickhouse}
|
||||
end
|
||||
end.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||
_ = application:stop(emqx_connector).
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _Config) ->
|
||||
ok.
|
||||
|
||||
% %%------------------------------------------------------------------------------
|
||||
% %% Testcases
|
||||
% %%------------------------------------------------------------------------------
|
||||
|
||||
t_lifecycle(_Config) ->
|
||||
perform_lifecycle_check(
|
||||
<<"emqx_connector_clickhouse_SUITE">>,
|
||||
clickhouse_config()
|
||||
).
|
||||
|
||||
show(X) ->
|
||||
erlang:display(X),
|
||||
X.
|
||||
|
||||
show(Label, What) ->
|
||||
erlang:display({Label, What}),
|
||||
What.
|
||||
|
||||
perform_lifecycle_check(PoolName, InitialConfig) ->
|
||||
{ok, #{config := CheckedConfig}} =
|
||||
emqx_resource:check_config(?CLICKHOUSE_RESOURCE_MOD, InitialConfig),
|
||||
{ok, #{
|
||||
state := #{poolname := ReturnedPoolName} = State,
|
||||
status := InitialStatus
|
||||
}} =
|
||||
emqx_resource:create_local(
|
||||
PoolName,
|
||||
?CONNECTOR_RESOURCE_GROUP,
|
||||
?CLICKHOUSE_RESOURCE_MOD,
|
||||
CheckedConfig,
|
||||
#{}
|
||||
),
|
||||
?assertEqual(InitialStatus, connected),
|
||||
% Instance should match the state and status of the just started resource
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
|
||||
state := State,
|
||||
status := InitialStatus
|
||||
}} =
|
||||
emqx_resource:get_instance(PoolName),
|
||||
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
|
||||
% % Perform query as further check that the resource is working as expected
|
||||
(fun() ->
|
||||
erlang:display({pool_name, PoolName}),
|
||||
QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()),
|
||||
?assertMatch({ok, _}, QueryNoParamsResWrapper),
|
||||
{_, QueryNoParamsRes} = QueryNoParamsResWrapper,
|
||||
?assertMatch(<<"1">>, string:trim(QueryNoParamsRes))
|
||||
end)(),
|
||||
?assertEqual(ok, emqx_resource:stop(PoolName)),
|
||||
% Resource will be listed still, but state will be changed and healthcheck will fail
|
||||
% as the worker no longer exists.
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
|
||||
state := State,
|
||||
status := StoppedStatus
|
||||
}} =
|
||||
emqx_resource:get_instance(PoolName),
|
||||
?assertEqual(stopped, StoppedStatus),
|
||||
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
|
||||
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
|
||||
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
||||
% Can call stop/1 again on an already stopped instance
|
||||
?assertEqual(ok, emqx_resource:stop(PoolName)),
|
||||
% Make sure it can be restarted and the healthchecks and queries work properly
|
||||
?assertEqual(ok, emqx_resource:restart(PoolName)),
|
||||
% async restart, need to wait resource
|
||||
timer:sleep(500),
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
|
||||
emqx_resource:get_instance(PoolName),
|
||||
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
|
||||
(fun() ->
|
||||
QueryNoParamsResWrapper =
|
||||
emqx_resource:query(PoolName, test_query_no_params()),
|
||||
?assertMatch({ok, _}, QueryNoParamsResWrapper),
|
||||
{_, QueryNoParamsRes} = QueryNoParamsResWrapper,
|
||||
?assertMatch(<<"1">>, string:trim(QueryNoParamsRes))
|
||||
end)(),
|
||||
% Stop and remove the resource in one go.
|
||||
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
|
||||
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
||||
% Should not even be able to get the resource data out of ets now unlike just stopping.
|
||||
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
|
||||
|
||||
% %%------------------------------------------------------------------------------
|
||||
% %% Helpers
|
||||
% %%------------------------------------------------------------------------------
|
||||
|
||||
clickhouse_config() ->
|
||||
Config =
|
||||
#{
|
||||
auto_reconnect => true,
|
||||
database => <<"mqtt">>,
|
||||
username => <<"default">>,
|
||||
password => <<"public">>,
|
||||
pool_size => 8,
|
||||
url => iolist_to_binary(
|
||||
io_lib:format(
|
||||
"http://~s:~b",
|
||||
[
|
||||
?CLICKHOUSE_HOST,
|
||||
?CLICKHOUSE_DEFAULT_PORT
|
||||
]
|
||||
)
|
||||
)
|
||||
},
|
||||
#{<<"config">> => Config}.
|
||||
|
||||
test_query_no_params() ->
|
||||
{query, <<"SELECT 1">>}.
|
3
mix.exs
3
mix.exs
|
@ -89,7 +89,8 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:ranch,
|
||||
github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true},
|
||||
# in conflict by grpc and eetcd
|
||||
{:gpb, "4.19.5", override: true, runtime: false}
|
||||
{:gpb, "4.19.5", override: true, runtime: false},
|
||||
{:hackney, github: "benoitc/hackney", tag: "1.18.1", override: true}
|
||||
] ++
|
||||
umbrella_apps() ++
|
||||
enterprise_apps(profile_info) ++
|
||||
|
|
|
@ -74,6 +74,7 @@
|
|||
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
||||
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
|
||||
, {telemetry, "1.1.0"}
|
||||
, {hackney, {git, "https://github.com/benoitc/hackney", {tag, "1.18.1"}}}
|
||||
]}.
|
||||
|
||||
{xref_ignores,
|
||||
|
|
|
@ -161,6 +161,9 @@ for dep in ${CT_DEPS}; do
|
|||
;;
|
||||
tdengine)
|
||||
FILES+=( '.ci/docker-compose-file/docker-compose-tdengine-restful.yaml' )
|
||||
;;
|
||||
clickhouse)
|
||||
FILES+=( '.ci/docker-compose-file/docker-compose-clickhouse.yaml' )
|
||||
;;
|
||||
*)
|
||||
echo "unknown_ct_dependency $dep"
|
||||
|
@ -194,7 +197,7 @@ if [[ -t 1 ]]; then
|
|||
fi
|
||||
|
||||
function restore_ownership {
|
||||
if ! sudo chown -R "$ORIG_UID_GID" . >/dev/null 2>&1; then
|
||||
if [[ -n ${EMQX_TEST_DO_NOT_RUN_SUDO+x} ]] || ! sudo chown -R "$ORIG_UID_GID" . >/dev/null 2>&1; then
|
||||
docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "chown -R $ORIG_UID_GID /emqx" >/dev/null 2>&1 || true
|
||||
fi
|
||||
}
|
||||
|
|
|
@ -267,3 +267,5 @@ keytab
|
|||
jq
|
||||
nif
|
||||
TDengine
|
||||
clickhouse
|
||||
FormatType
|
||||
|
|
Loading…
Reference in New Issue