From 0090a3ee9393f00ecfd6285dab79979a206eacda Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 10 Aug 2022 11:47:14 +0800 Subject: [PATCH 01/11] fix(influxdb): fix illegal `wirte_syntax` example --- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 83c5a4127..53d02fa6d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -51,7 +51,7 @@ values(Protocol, get) -> values(Protocol, post) -> case Protocol of "influxdb_api_v2" -> - SupportUint = <<"uint_value=${payload.uint_key}u">>; + SupportUint = <<"uint_value=${payload.uint_key}u,">>; _ -> SupportUint = <<>> end, From 223b84017edd0d557498a184678c15b3590a6340 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 11:35:04 +0800 Subject: [PATCH 02/11] fix(influxdb): api schema `write_syntax` using raw type `string()` --- apps/emqx_dashboard/src/emqx_dashboard_swagger.erl | 7 +++++++ lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl | 11 ++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 59b320368..68514fdb0 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -656,6 +656,13 @@ typename_to_spec("file()", _Mod) -> #{type => string, example => <<"/path/to/file">>}; typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>}; +typename_to_spec("write_syntax()", _Mod) -> + #{ + type => string, + example => + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", "bool=${payload.bool}">> + }; typename_to_spec("ip_ports()", _Mod) -> #{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>}; typename_to_spec("url()", _Mod) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 53d02fa6d..7de640040 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -21,6 +21,11 @@ desc/1 ]). +-type write_syntax() :: list(). +-reflect_type([write_syntax/0]). +-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}). +-export([to_influx_lines/1]). + %% ------------------------------------------------------------------------------------------------- %% api @@ -148,19 +153,19 @@ desc(_) -> undefined. write_syntax(type) -> - list(); + ?MODULE:write_syntax(); write_syntax(required) -> true; write_syntax(validator) -> [?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")]; write_syntax(converter) -> - fun converter_influx_lines/1; + fun to_influx_lines/1; write_syntax(desc) -> ?DESC("write_syntax"); write_syntax(_) -> undefined. -converter_influx_lines(RawLines) -> +to_influx_lines(RawLines) -> Lines = string:tokens(str(RawLines), "\n"), lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)). From 22a4ca311c9e9320260d90d9d2eb2ecc2c1d6cd5 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 15:09:05 +0800 Subject: [PATCH 03/11] feat(resource): resource batch/async/queue config schema --- .../i18n/emqx_resource_schema_i18n.conf | 119 ++++++++++++++++++ apps/emqx_resource/include/emqx_resource.hrl | 12 ++ .../src/emqx_resource_worker.erl | 17 +-- .../src/schema/emqx_resource_schema.erl | 99 +++++++++++++++ .../test/emqx_resource_SUITE.erl | 2 +- 5 files changed, 235 insertions(+), 14 deletions(-) create mode 100644 apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf create mode 100644 apps/emqx_resource/src/schema/emqx_resource_schema.erl diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf new file mode 100644 index 000000000..a3fb6c402 --- /dev/null +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -0,0 +1,119 @@ +emqx_resource_schema { + batch { + desc { + en: """ +Configuration of batch query.
+Batch requests are made immediately when the number of requests reaches the `batch_size`, or also immediately when the number of requests is less than the batch request size but the maximum batch_time has been reached. +""" + zh: """ +批量请求配置。
+请求数达到批量请求大小时立刻进行批量请求,或当请求数不足批量请求数大小,但已经达到最大批量等待时间时也立即进行批量请求。 +""" + } + label { + en: """batch""" + zh: """批量请求""" + } + } + + queue { + desc { + en: """Configuration of queue.""" + zh: """请求队列配置""" + } + label { + en: """queue""" + zh: """请求队列""" + } + } + + query_mode { + desc { + en: """Query mode. Optional 'sync/async', default 'sync'.""" + zh: """请求模式。可选 '同步/异步',默认为'同步'模式。""" + } + label { + en: """query_mode""" + zh: """query_mode""" + } + } + + enable_batch { + desc { + en: """Batch mode enabled.""" + zh: """启用批量模式。""" + } + label { + en: """enable_batch""" + zh: """启用批量模式""" + } + } + + enable_queue { + desc { + en: """Queue mode enabled.""" + zh: """启用队列模式。""" + } + label { + en: """enable_queue""" + zh: """启用队列模式""" + } + } + + resume_interval { + desc { + en: """Resume time interval when resource down.""" + zh: """资源不可用时的重试时间""" + } + label { + en: """resume_interval""" + zh: """恢复时间""" + } + } + + async_inflight_window { + desc { + en: """Async queyr inflight window.""" + zh: """异步请求飞行队列窗口大小""" + } + label { + en: """async_inflight_window""" + zh: """异步请求飞行队列窗口""" + } + } + + batch_size { + desc { + en: """Maximum batch count.""" + zh: """批量请求大小""" + } + label { + en: """batch_size""" + zh: """批量请求大小""" + } + } + + batch_time { + desc { + en: """Maximum batch waiting interval.""" + zh: """最大批量请求等待时间。""" + } + label { + en: """batch_time""" + zh: """批量等待间隔""" + } + } + + queue_max_bytes { + desc { + en: """Maximum queue storage size in bytes.""" + zh: """消息队列的最大长度,以字节计。""" + } + label { + en: """queue_max_bytes""" + zh: """队列最大长度""" + } + } + + +} diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 5c561a8d3..5327e3aae 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -60,5 +60,17 @@ | {error, term()} | {resource_down, term()}. +%% count +-define(DEFAULT_BATCH_SIZE, 100). +%% milliseconds +-define(DEFAULT_BATCH_TIME, 10). + +%% bytes +-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). + +-define(DEFAULT_INFLIGHT, 100). + +-define(RESUME_INTERVAL, 15000). + -define(TEST_ID_PREFIX, "_test_:"). -define(RES_METRICS, resource_metrics). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 716842bf9..e940dcb69 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -52,13 +52,6 @@ -export([reply_after_query/6, batch_reply_after_query/6]). --define(RESUME_INTERVAL, 15000). - -%% count --define(DEFAULT_BATCH_SIZE, 100). -%% milliseconds --define(DEFAULT_BATCH_TIME, 10). - -define(Q_ITEM(REQUEST), {q_item, REQUEST}). -define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}). @@ -69,8 +62,6 @@ {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} ). -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). --define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). --define(DEFAULT_INFLIGHT, 100). -type id() :: binary(). -type query() :: {query, from(), request()}. @@ -122,7 +113,7 @@ init({Id, Index, Opts}) -> Name = name(Id, Index), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), Queue = - case maps:get(queue_enabled, Opts, false) of + case maps:get(enable_queue, Opts, false) of true -> replayq:open(#{ dir => disk_queue_dir(Id, Index), @@ -144,7 +135,7 @@ init({Id, Index, Opts}) -> %% if the resource worker is overloaded query_mode => maps:get(query_mode, Opts, sync), async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), - batch_enabled => maps:get(batch_enabled, Opts, false), + enable_batch => maps:get(enable_batch, Opts, false), batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, @@ -270,14 +261,14 @@ drop_head(Q) -> ok = replayq:ack(Q1, AckRef), Q1. -query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Left} = St0) -> +query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} end; -query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) -> +query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, query_mode := QM} = St) -> QueryOpts = #{ inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl new file mode 100644 index 000000000..933cd0189 --- /dev/null +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_schema). + +-include("emqx_resource.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([namespace/0, roots/0, fields/1]). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions + +namespace() -> "resource_schema". + +roots() -> []. + +fields('batch&async&queue') -> + [ + {query_mode, fun query_mode/1}, + {resume_interval, fun resume_interval/1}, + {async_inflight_window, fun async_inflight_window/1}, + {batch, mk(ref(?MODULE, batch), #{desc => ?DESC("batch")})}, + {queue, mk(ref(?MODULE, queue), #{desc => ?DESC("queue")})} + ]; +fields(batch) -> + [ + {enable_batch, fun enable_batch/1}, + {batch_size, fun batch_size/1}, + {batch_time, fun batch_time/1} + ]; +fields(queue) -> + [ + {enable_queue, fun enable_queue/1}, + {max_queue_bytes, fun queue_max_bytes/1} + ]. + +query_mode(type) -> enum([sync, async]); +query_mode(desc) -> ?DESC("query_mode"); +query_mode(default) -> sync; +query_mode(required) -> false; +query_mode(_) -> undefined. + +enable_batch(type) -> boolean(); +enable_batch(required) -> false; +enable_batch(default) -> false; +enable_batch(desc) -> ?DESC("enable_batch"); +enable_batch(_) -> undefined. + +enable_queue(type) -> boolean(); +enable_queue(required) -> false; +enable_queue(default) -> false; +enable_queue(desc) -> ?DESC("enable_queue"); +enable_queue(_) -> undefined. + +resume_interval(type) -> emqx_schema:duration_ms(); +resume_interval(desc) -> ?DESC("resume_interval"); +resume_interval(default) -> ?RESUME_INTERVAL; +resume_interval(required) -> false; +resume_interval(_) -> undefined. + +async_inflight_window(type) -> pos_integer(); +async_inflight_window(desc) -> ?DESC("async_inflight_window"); +async_inflight_window(default) -> ?DEFAULT_INFLIGHT; +async_inflight_window(required) -> false; +async_inflight_window(_) -> undefined. + +batch_size(type) -> pos_integer(); +batch_size(desc) -> ?DESC("batch_size"); +batch_size(default) -> ?DEFAULT_BATCH_SIZE; +batch_size(required) -> false; +batch_size(_) -> undefined. + +batch_time(type) -> emqx_schema:duration_ms(); +batch_time(desc) -> ?DESC("batch_time"); +batch_time(default) -> ?DEFAULT_BATCH_TIME; +batch_time(required) -> false; +batch_time(_) -> undefined. + +queue_max_bytes(type) -> emqx_schema:bytesize(); +queue_max_bytes(desc) -> ?DESC("queue_max_bytes"); +queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE; +queue_max_bytes(required) -> false; +queue_max_bytes(_) -> undefined. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5c62e22c2..ddd671b75 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -211,7 +211,7 @@ t_batch_query_counter(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{batch_enabled => true} + #{enable_batch => true} ), ?check_trace( From 2843d33f7a33fa46bb12ae09b788944b99870403 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 15:10:25 +0800 Subject: [PATCH 04/11] chore: refine md form format --- deploy/charts/emqx/README.md | 108 +++++++++++++++++------------------ 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/deploy/charts/emqx/README.md b/deploy/charts/emqx/README.md index 496d52061..acbbcf95e 100644 --- a/deploy/charts/emqx/README.md +++ b/deploy/charts/emqx/README.md @@ -1,5 +1,5 @@ # Introduction -This chart bootstraps an emqx deployment on a Kubernetes cluster using the Helm package manager. +This chart bootstraps an emqx deployment on a Kubernetes cluster using the Helm package manager. # Prerequisites + Kubernetes 1.6+ @@ -8,7 +8,7 @@ This chart bootstraps an emqx deployment on a Kubernetes cluster using the Helm # Installing the Chart To install the chart with the release name `my-emqx`: -+ From github ++ From github ``` $ git clone https://github.com/emqx/emqx.git $ cd emqx/deploy/charts/emqx @@ -31,58 +31,58 @@ $ helm del my-emqx # Configuration The following table lists the configurable parameters of the emqx chart and their default values. -| Parameter | Description | Default Value | -| --- | --- | --- | -| `replicaCount` | It is recommended to have odd number of nodes in a cluster, otherwise the emqx cluster cannot be automatically healed in case of net-split. |3| -| `image.repository` | EMQX Image name |emqx/emqx| -| `image.pullPolicy` | The image pull policy |IfNotPresent| -| `image.pullSecrets ` | The image pull secrets |`[]` (does not add image pull secrets to deployed pods)| -| `envFromSecret` | The name pull a secret in the same kubernetes namespace which contains values that will be added to the environment | nil | -| `recreatePods` | Forces the recreation of pods during upgrades, which can be useful to always apply the most recent configuration. | false | -`podAnnotations ` | Annotations for pod | `{}` -`podManagementPolicy`| To redeploy a chart with existing PVC(s), the value must be set to Parallel to avoid deadlock | `Parallel` -| `persistence.enabled` | Enable EMQX persistence using PVC |false| -| `persistence.storageClass` | Storage class of backing PVC |`nil` (uses alpha storage class annotation)| -| `persistence.existingClaim` | EMQX data Persistent Volume existing claim name, evaluated as a template |""| -| `persistence.accessMode` | PVC Access Mode for EMQX volume |ReadWriteOnce| -| `persistence.size` | PVC Storage Request for EMQX volume |20Mi| -| `initContainers` | Containers that run before the creation of EMQX containers. They can contain utilities or setup scripts. |`{}`| -| `resources` | CPU/Memory resource requests/limits |{}| -| `nodeSelector` | Node labels for pod assignment |`{}`| -| `tolerations` | Toleration labels for pod assignment |`[]`| -| `affinity` | Map of node/pod affinities |`{}`| -| `service.type` | Kubernetes Service type. |ClusterIP| -| `service.mqtt` | Port for MQTT. |1883| -| `service.mqttssl` | Port for MQTT(SSL). |8883| -| `service.mgmt` | Port for mgmt API. |8081| -| `service.ws` | Port for WebSocket/HTTP. |8083| -| `service.wss` | Port for WSS/HTTPS. |8084| -| `service.dashboard` | Port for dashboard. |18083| -| `service.nodePorts.mqtt` | Kubernetes node port for MQTT. |nil| -| `service.nodePorts.mqttssl` | Kubernetes node port for MQTT(SSL). |nil| -| `service.nodePorts.mgmt` | Kubernetes node port for mgmt API. |nil| -| `service.nodePorts.ws` | Kubernetes node port for WebSocket/HTTP. |nil| -| `service.nodePorts.wss` | Kubernetes node port for WSS/HTTPS. |nil| -| `service.nodePorts.dashboard` | Kubernetes node port for dashboard. |nil| -| `service.loadBalancerIP` | loadBalancerIP for Service | nil | -| `service.loadBalancerSourceRanges` | Address(es) that are allowed when service is LoadBalancer | [] | -| `service.externalIPs` | ExternalIPs for the service | [] | -| `service.annotations` | Service annotations | {}(evaluated as a template)| -| `ingress.dashboard.enabled` | Enable ingress for EMQX Dashboard | false | -| `ingress.dashboard.ingressClassName` | Set the ingress class for EMQX Dashboard | | -| `ingress.dashboard.path` | Ingress path for EMQX Dashboard | / | -| `ingress.dashboard.pathType` | Ingress pathType for EMQX Dashboard | `ImplementationSpecific` -| `ingress.dashboard.hosts` | Ingress hosts for EMQX Mgmt API | dashboard.emqx.local | -| `ingress.dashboard.tls` | Ingress tls for EMQX Mgmt API | [] | -| `ingress.dashboard.annotations` | Ingress annotations for EMQX Mgmt API | {} | -| `ingress.mgmt.enabled` | Enable ingress for EMQX Mgmt API | false | -| `ingress.dashboard.ingressClassName` | Set the ingress class for EMQX Mgmt API | | -| `ingress.mgmt.path` | Ingress path for EMQX Mgmt API | / | -| `ingress.mgmt.hosts` | Ingress hosts for EMQX Mgmt API | api.emqx.local | -| `ingress.mgmt.tls` | Ingress tls for EMQX Mgmt API | [] | -| `ingress.mgmt.annotations` | Ingress annotations for EMQX Mgmt API | {} | -| `metrics.enable` | If set to true, [prometheus-operator](https://github.com/prometheus-operator/prometheus-operator) needs to be installed, and emqx_prometheus needs to enable | false | -| `metrics.type` | Now we only supported "prometheus" | "prometheus" | +| Parameter | Description | Default Value | +|--------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------| +| `replicaCount` | It is recommended to have odd number of nodes in a cluster, otherwise the emqx cluster cannot be automatically healed in case of net-split. | 3 | +| `image.repository` | EMQX Image name | emqx/emqx | +| `image.pullPolicy` | The image pull policy | IfNotPresent | +| `image.pullSecrets ` | The image pull secrets | `[]` (does not add image pull secrets to deployed pods) | +| `envFromSecret` | The name pull a secret in the same kubernetes namespace which contains values that will be added to the environment | nil | +| `recreatePods` | Forces the recreation of pods during upgrades, which can be useful to always apply the most recent configuration. | false | +| `podAnnotations ` | Annotations for pod | `{}` | +| `podManagementPolicy` | To redeploy a chart with existing PVC(s), the value must be set to Parallel to avoid deadlock | `Parallel` | +| `persistence.enabled` | Enable EMQX persistence using PVC | false | +| `persistence.storageClass` | Storage class of backing PVC | `nil` (uses alpha storage class annotation) | +| `persistence.existingClaim` | EMQX data Persistent Volume existing claim name, evaluated as a template | "" | +| `persistence.accessMode` | PVC Access Mode for EMQX volume | ReadWriteOnce | +| `persistence.size` | PVC Storage Request for EMQX volume | 20Mi | +| `initContainers` | Containers that run before the creation of EMQX containers. They can contain utilities or setup scripts. | `{}` | +| `resources` | CPU/Memory resource requests/limits | {} | +| `nodeSelector` | Node labels for pod assignment | `{}` | +| `tolerations` | Toleration labels for pod assignment | `[]` | +| `affinity` | Map of node/pod affinities | `{}` | +| `service.type` | Kubernetes Service type. | ClusterIP | +| `service.mqtt` | Port for MQTT. | 1883 | +| `service.mqttssl` | Port for MQTT(SSL). | 8883 | +| `service.mgmt` | Port for mgmt API. | 8081 | +| `service.ws` | Port for WebSocket/HTTP. | 8083 | +| `service.wss` | Port for WSS/HTTPS. | 8084 | +| `service.dashboard` | Port for dashboard. | 18083 | +| `service.nodePorts.mqtt` | Kubernetes node port for MQTT. | nil | +| `service.nodePorts.mqttssl` | Kubernetes node port for MQTT(SSL). | nil | +| `service.nodePorts.mgmt` | Kubernetes node port for mgmt API. | nil | +| `service.nodePorts.ws` | Kubernetes node port for WebSocket/HTTP. | nil | +| `service.nodePorts.wss` | Kubernetes node port for WSS/HTTPS. | nil | +| `service.nodePorts.dashboard` | Kubernetes node port for dashboard. | nil | +| `service.loadBalancerIP` | loadBalancerIP for Service | nil | +| `service.loadBalancerSourceRanges` | Address(es) that are allowed when service is LoadBalancer | [] | +| `service.externalIPs` | ExternalIPs for the service | [] | +| `service.annotations` | Service annotations | {}(evaluated as a template) | +| `ingress.dashboard.enabled` | Enable ingress for EMQX Dashboard | false | +| `ingress.dashboard.ingressClassName` | Set the ingress class for EMQX Dashboard | | +| `ingress.dashboard.path` | Ingress path for EMQX Dashboard | / | +| `ingress.dashboard.pathType` | Ingress pathType for EMQX Dashboard | `ImplementationSpecific` | +| `ingress.dashboard.hosts` | Ingress hosts for EMQX Mgmt API | dashboard.emqx.local | +| `ingress.dashboard.tls` | Ingress tls for EMQX Mgmt API | [] | +| `ingress.dashboard.annotations` | Ingress annotations for EMQX Mgmt API | {} | +| `ingress.mgmt.enabled` | Enable ingress for EMQX Mgmt API | false | +| `ingress.dashboard.ingressClassName` | Set the ingress class for EMQX Mgmt API | | +| `ingress.mgmt.path` | Ingress path for EMQX Mgmt API | / | +| `ingress.mgmt.hosts` | Ingress hosts for EMQX Mgmt API | api.emqx.local | +| `ingress.mgmt.tls` | Ingress tls for EMQX Mgmt API | [] | +| `ingress.mgmt.annotations` | Ingress annotations for EMQX Mgmt API | {} | +| `metrics.enable` | If set to true, [prometheus-operator](https://github.com/prometheus-operator/prometheus-operator) needs to be installed, and emqx_prometheus needs to enable | false | +| `metrics.type` | Now we only supported "prometheus" | "prometheus" | ## EMQX specific settings The following table lists the configurable [EMQX](https://www.emqx.io/)-specific parameters of the chart and their default values. From 0f6c3717602e20f77635831ea25eaf90b3d2ff7e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 18:12:41 +0800 Subject: [PATCH 05/11] feat(influxdb): influxdb connector add `on_batch_query/3` callback --- apps/emqx_resource/src/emqx_resource.erl | 1 + .../src/emqx_ee_bridge_influxdb.erl | 9 +- .../src/emqx_ee_connector_influxdb.erl | 106 ++++++++++++++---- 3 files changed, 91 insertions(+), 25 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 0d2289696..6601b9eea 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -126,6 +126,7 @@ %% when calling emqx_resource:query/3 -callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result(). +%% when calling emqx_resource:on_batch_query/3 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). %% when calling emqx_resource:health_check/2 diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 7de640040..4edeb786a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -70,7 +70,8 @@ values(Protocol, post) -> write_syntax => <<"${topic},clientid=${clientid}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary, - "bool=${payload.bool}">> + "bool=${payload.bool}">>, + batch => #{enable_batch => false, batch_size => 5, batch_time => <<"1m">>} }; values(Protocol, put) -> values(Protocol, post). @@ -109,7 +110,9 @@ fields("get_api_v2") -> fields(Name) when Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 -> - fields(basic) ++ connector_field(Name). + fields(basic) ++ + emqx_resource_schema:fields('batch&async&queue') ++ + connector_field(Name). method_fileds(post, ConnectorType) -> fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType); @@ -162,6 +165,8 @@ write_syntax(converter) -> fun to_influx_lines/1; write_syntax(desc) -> ?DESC("write_syntax"); +write_syntax(format) -> + <<"sql">>; write_syntax(_) -> undefined. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 09b3d7350..41f7059ec 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -17,6 +17,7 @@ on_start/2, on_stop/2, on_query/3, + on_batch_query/3, on_get_status/2 ]). @@ -37,8 +38,29 @@ on_start(InstId, Config) -> on_stop(_InstId, #{client := Client}) -> influxdb:stop_client(Client). -on_query(InstId, {send_message, Data}, State) -> - do_query(InstId, {send_message, Data}, State). +on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + do_query(InstId, Client, Points); + {error, ErrorPoints} = Err -> + log_error_points(InstId, ErrorPoints), + Err + end. + +%% Once a Batched Data trans to points failed. +%% This batch query failed +on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client := Client}) -> + case on_get_status(InstId, State) of + connected -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + do_query(InstId, Client, Points); + {error, Reason} -> + {error, Reason} + end; + disconnected -> + {resource_down, disconnected} + end. on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) of @@ -79,7 +101,7 @@ fields("api_v2_put") -> fields(basic) -> [ {host, - mk(binary(), #{required => true, default => <<"120.0.0.1">>, desc => ?DESC("host")})}, + mk(binary(), #{required => true, default => <<"127.0.0.1">>, desc => ?DESC("host")})}, {port, mk(pos_integer(), #{required => true, default => 8086, desc => ?DESC("port")})}, {precision, mk(enum([ns, us, ms, s, m, h]), #{ @@ -310,18 +332,7 @@ ssl_config(SSL = #{enable := true}) -> %% ------------------------------------------------------------------------------------------------- %% Query -do_query(InstId, {send_message, Data}, State = #{client := Client}) -> - {Points, Errs} = data_to_points(Data, State), - lists:foreach( - fun({error, Reason}) -> - ?SLOG(error, #{ - msg => "influxdb trans point failed", - connector => InstId, - reason => Reason - }) - end, - Errs - ), +do_query(InstId, Client, Points) -> case influxdb:write(Client, Points) of ok -> ?SLOG(debug, #{ @@ -376,11 +387,45 @@ to_maps_config(K, V, Res) -> %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Data Trans -data_to_points(Data, #{write_syntax := Lines}) -> - lines_to_points(Data, Lines, [], []). +parse_batch_data(InstId, BatchData, SyntaxLines) -> + {Points, Errors} = lists:foldl( + fun({send_message, Data}, {AccIn, ErrAccIn}) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + {[Points | AccIn], ErrAccIn}; + {error, ErrorPoints} -> + log_error_points(InstId, ErrorPoints), + {AccIn, ErrAccIn + 1} + end + end, + {[], 0}, + BatchData + ), + case Errors of + 0 -> + {ok, Points}; + _ -> + ?SLOG(error, #{ + msg => io_lib:format("InfluxDB trans point failed, count: ~p", [Errors]), + connector => InstId, + reason => points_trans_failed + }), + {error, points_trans_failed} + end. -lines_to_points(_, [], Points, Errs) -> - {Points, Errs}; +data_to_points(Data, SyntaxLines) -> + lines_to_points(Data, SyntaxLines, [], []). + +%% When converting multiple rows data into InfluxDB Line Protocol, they are considered to be strongly correlated. +%% And once a row fails to convert, all of them are considered to have failed. +lines_to_points(_, [], Points, ErrorPoints) -> + case ErrorPoints of + [] -> + {ok, Points}; + _ -> + %% ignore trans succeeded points + {error, ErrorPoints} + end; lines_to_points( Data, [ @@ -392,8 +437,8 @@ lines_to_points( } | Rest ], - ResAcc, - ErrAcc + ResultPointsAcc, + ErrorPointsAcc ) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of @@ -406,9 +451,11 @@ lines_to_points( tags => EncodeTags, fields => EncodeFields }, - lines_to_points(Data, Rest, [Point | ResAcc], ErrAcc); + lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc); BadTimestamp -> - lines_to_points(Data, Rest, ResAcc, [{error, {bad_timestamp, BadTimestamp}} | ErrAcc]) + lines_to_points(Data, Rest, ResultPointsAcc, [ + {error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc + ]) end. maps_config_to_data(K, V, {Data, Res}) -> @@ -461,3 +508,16 @@ data_filter(Bool) when is_boolean(Bool) -> Bool; data_filter(Data) -> bin(Data). bin(Data) -> emqx_plugin_libs_rule:bin(Data). + +%% helper funcs +log_error_points(InstId, Errs) -> + lists:foreach( + fun({error, Reason}) -> + ?SLOG(error, #{ + msg => "influxdb trans point failed", + connector => InstId, + reason => Reason + }) + end, + Errs + ). From 2872f0b6685061fa4242d829b6f33cdf88e5f76c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 11 Aug 2022 19:11:44 +0800 Subject: [PATCH 06/11] fix(bridges): support create resources with options --- apps/emqx_bridge/src/emqx_bridge.erl | 9 +++-- apps/emqx_bridge/src/emqx_bridge_resource.erl | 11 +++-- .../i18n/emqx_resource_schema_i18n.conf | 28 ------------- apps/emqx_resource/include/emqx_resource.hrl | 12 +++++- apps/emqx_resource/src/emqx_resource.erl | 40 ++++++++++++++----- .../src/emqx_resource_manager.erl | 8 ++-- .../src/proto/emqx_resource_proto_v1.erl | 4 +- .../src/schema/emqx_resource_schema.erl | 12 +----- .../src/emqx_ee_bridge_influxdb.erl | 2 +- 9 files changed, 61 insertions(+), 65 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index ba6c64dbc..c794a25a5 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -171,9 +171,9 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> diff_confs(NewConf, OldConf), %% The config update will be failed if any task in `perform_bridge_changes` failed. Result = perform_bridge_changes([ - {fun emqx_bridge_resource:remove/3, Removed}, - {fun emqx_bridge_resource:create/3, Added}, - {fun emqx_bridge_resource:update/3, Updated} + {fun emqx_bridge_resource:remove/4, Removed}, + {fun emqx_bridge_resource:create/4, Added}, + {fun emqx_bridge_resource:update/4, Updated} ]), ok = unload_hook(), ok = load_hook(NewConf), @@ -261,7 +261,8 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> ({_Type, _Name}, _Conf, {error, Reason}) -> {error, Reason}; ({Type, Name}, Conf, _) -> - case Action(Type, Name, Conf) of + ResOpts = emqx_resource:fetch_creation_opts(Conf), + case Action(Type, Name, Conf, ResOpts) of {error, Reason} -> {error, Reason}; Return -> Return end diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 35ace560c..ac1ec6ba3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -158,11 +158,14 @@ recreate(Type, Name) -> recreate(Type, Name, emqx:get_config([bridges, Type, Name])). recreate(Type, Name, Conf) -> + recreate(Type, Name, Conf, #{}). + +recreate(Type, Name, Conf, Opts) -> emqx_resource:recreate_local( resource_id(Type, Name), bridge_to_resource_type(Type), parse_confs(Type, Name, Conf), - #{auto_retry_interval => 60000} + Opts#{auto_retry_interval => 60000} ). create_dry_run(Type, Conf) -> @@ -186,13 +189,13 @@ create_dry_run(Type, Conf) -> remove(BridgeId) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), - remove(BridgeType, BridgeName, #{}). + remove(BridgeType, BridgeName, #{}, #{}). remove(Type, Name) -> - remove(Type, Name, undefined). + remove(Type, Name, #{}, #{}). %% just for perform_bridge_changes/1 -remove(Type, Name, _Conf) -> +remove(Type, Name, _Conf, _Opts) -> ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of ok -> ok; diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index a3fb6c402..abdb220bb 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -1,32 +1,4 @@ emqx_resource_schema { - batch { - desc { - en: """ -Configuration of batch query.
-Batch requests are made immediately when the number of requests reaches the `batch_size`, or also immediately when the number of requests is less than the batch request size but the maximum batch_time has been reached. -""" - zh: """ -批量请求配置。
-请求数达到批量请求大小时立刻进行批量请求,或当请求数不足批量请求数大小,但已经达到最大批量等待时间时也立即进行批量请求。 -""" - } - label { - en: """batch""" - zh: """批量请求""" - } - } - - queue { - desc { - en: """Configuration of queue.""" - zh: """请求队列配置""" - } - label { - en: """queue""" - zh: """请求队列""" - } - } - query_mode { desc { en: """Query mode. Optional 'sync/async', default 'sync'.""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 5327e3aae..5b6856dc0 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -40,7 +40,7 @@ metrics := emqx_metrics_worker:metrics() }. -type resource_group() :: binary(). --type create_opts() :: #{ +-type creation_opts() :: #{ health_check_interval => integer(), health_check_timeout => integer(), %% We can choose to block the return of emqx_resource:start until @@ -52,7 +52,15 @@ start_after_created => boolean(), %% If the resource disconnected, we can set to retry starting the resource %% periodically. - auto_retry_interval => integer() + auto_retry_interval => integer(), + enable_batch => boolean(), + batch_size => integer(), + batch_time => integer(), + enable_queue => boolean(), + queue_max_bytes => integer(), + query_mode => async | sync | dynamic + resume_interval => integer(), + async_inflight_window => integer() }. -type query_result() :: ok diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 6601b9eea..b134d2af1 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -102,6 +102,7 @@ list_instances_verbose/0, %% return the data of the instance get_instance/1, + fetch_creation_opts/1, %% return all the instances of the same resource type list_instances_by_type/1, generate_id/1, @@ -159,7 +160,7 @@ is_resource_mod(Module) -> create(ResId, Group, ResourceType, Config) -> create(ResId, Group, ResourceType, Config, #{}). --spec create(resource_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> +-spec create(resource_id(), resource_group(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(ResId, Group, ResourceType, Config, Opts) -> emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts). @@ -175,7 +176,7 @@ create_local(ResId, Group, ResourceType, Config) -> resource_group(), resource_type(), resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()}. create_local(ResId, Group, ResourceType, Config, Opts) -> @@ -196,7 +197,7 @@ create_dry_run_local(ResourceType, Config) -> recreate(ResId, ResourceType, Config) -> recreate(ResId, ResourceType, Config, #{}). --spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) -> +-spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate(ResId, ResourceType, Config, Opts) -> emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts). @@ -206,7 +207,7 @@ recreate(ResId, ResourceType, Config, Opts) -> recreate_local(ResId, ResourceType, Config) -> recreate_local(ResId, ResourceType, Config, #{}). --spec recreate_local(resource_id(), resource_type(), resource_config(), create_opts()) -> +-spec recreate_local(resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate_local(ResId, ResourceType, Config, Opts) -> emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). @@ -249,7 +250,7 @@ simple_async_query(ResId, Request, ReplyFun) -> start(ResId) -> start(ResId, #{}). --spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. start(ResId, Opts) -> emqx_resource_manager:start(ResId, Opts). @@ -257,7 +258,7 @@ start(ResId, Opts) -> restart(ResId) -> restart(ResId, #{}). --spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. restart(ResId, Opts) -> emqx_resource_manager:restart(ResId, Opts). @@ -277,6 +278,25 @@ set_resource_status_connecting(ResId) -> get_instance(ResId) -> emqx_resource_manager:lookup(ResId). +-spec fetch_creation_opts(map()) -> creation_opts(). +fetch_creation_opts(Opts) -> + SupportedOpts = [ + health_check_interval, + health_check_timeout, + wait_for_resource_ready, + start_after_created, + auto_retry_interval, + enable_batch, + batch_size, + batch_time, + enable_queue, + queue_max_bytes, + query_mode, + resume_interval, + async_inflight_window + ], + maps:with(creation_opts(), SupportedOpts). + -spec list_instances() -> [resource_id()]. list_instances() -> [Id || #{id := Id} <- list_instances_verbose()]. @@ -341,7 +361,7 @@ check_and_create(ResId, Group, ResourceType, RawConfig) -> resource_group(), resource_type(), raw_resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data() | 'already_created'} | {error, term()}. check_and_create(ResId, Group, ResourceType, RawConfig, Opts) -> @@ -366,7 +386,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig) -> resource_group(), resource_type(), raw_resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()} | {error, term()}. check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> check_and_do( @@ -379,7 +399,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> resource_id(), resource_type(), raw_resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()} | {error, term()}. check_and_recreate(ResId, ResourceType, RawConfig, Opts) -> @@ -393,7 +413,7 @@ check_and_recreate(ResId, ResourceType, RawConfig, Opts) -> resource_id(), resource_type(), raw_resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()} | {error, term()}. check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 3310555d1..608548898 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -85,7 +85,7 @@ manager_id_to_resource_id(MgrId) -> resource_group(), resource_type(), resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()}. ensure_resource(ResId, Group, ResourceType, Config, Opts) -> case lookup(ResId) of @@ -97,7 +97,7 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) -> end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist --spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) -> +-spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. recreate(ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of @@ -166,7 +166,7 @@ remove(ResId, ClearMetrics) when is_binary(ResId) -> safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). %% @doc Stops and then starts an instance that was already running --spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. restart(ResId, Opts) when is_binary(ResId) -> case safe_call(ResId, restart, ?T_OPERATION) of ok -> @@ -177,7 +177,7 @@ restart(ResId, Opts) when is_binary(ResId) -> end. %% @doc Start the resource --spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. start(ResId, Opts) -> case safe_call(ResId, start, ?T_OPERATION) of ok -> diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index cdd2592d9..11af1a62c 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -38,7 +38,7 @@ introduced_in() -> resource_group(), resource_type(), resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(ResId, Group, ResourceType, Config, Opts) -> @@ -58,7 +58,7 @@ create_dry_run(ResourceType, Config) -> resource_id(), resource_type(), resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()} | {error, Reason :: term()}. recreate(ResId, ResourceType, Config, Opts) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 933cd0189..464c055b7 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -30,22 +30,14 @@ namespace() -> "resource_schema". roots() -> []. -fields('batch&async&queue') -> +fields('creation_opts') -> [ {query_mode, fun query_mode/1}, {resume_interval, fun resume_interval/1}, {async_inflight_window, fun async_inflight_window/1}, - {batch, mk(ref(?MODULE, batch), #{desc => ?DESC("batch")})}, - {queue, mk(ref(?MODULE, queue), #{desc => ?DESC("queue")})} - ]; -fields(batch) -> - [ {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, - {batch_time, fun batch_time/1} - ]; -fields(queue) -> - [ + {batch_time, fun batch_time/1}, {enable_queue, fun enable_queue/1}, {max_queue_bytes, fun queue_max_bytes/1} ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 4edeb786a..23c6788e8 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -111,7 +111,7 @@ fields(Name) when Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 -> fields(basic) ++ - emqx_resource_schema:fields('batch&async&queue') ++ + emqx_resource_schema:fields('creation_opts') ++ connector_field(Name). method_fileds(post, ConnectorType) -> From 3a76a50382a55ddc1b394bd9d3d671c56b0e2968 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 19:45:52 +0800 Subject: [PATCH 07/11] fix: syntax error and compile error --- apps/emqx_bridge/src/emqx_bridge_resource.erl | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 2 +- apps/emqx_resource/src/emqx_resource.erl | 2 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl | 4 +++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index ac1ec6ba3..66c4524b0 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -34,7 +34,7 @@ create_dry_run/2, remove/1, remove/2, - remove/3, + remove/4, update/2, update/3, stop/2, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 5b6856dc0..ed8929831 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -58,7 +58,7 @@ batch_time => integer(), enable_queue => boolean(), queue_max_bytes => integer(), - query_mode => async | sync | dynamic + query_mode => async | sync | dynamic, resume_interval => integer(), async_inflight_window => integer() }. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b134d2af1..c4fd24007 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -295,7 +295,7 @@ fetch_creation_opts(Opts) -> resume_interval, async_inflight_window ], - maps:with(creation_opts(), SupportedOpts). + maps:with(SupportedOpts, Opts). -spec list_instances() -> [resource_id()]. list_instances() -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 23c6788e8..ece2a6bc7 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -71,7 +71,9 @@ values(Protocol, post) -> <<"${topic},clientid=${clientid}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary, "bool=${payload.bool}">>, - batch => #{enable_batch => false, batch_size => 5, batch_time => <<"1m">>} + enable_batch => false, + batch_size => 5, + batch_time => <<"1m">> }; values(Protocol, put) -> values(Protocol, post). From db3e4f0b906aa811c6fec268381f15fc9a2ff2ec Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 20:58:56 +0800 Subject: [PATCH 08/11] fix: flatten Points list --- .../emqx_ee_connector/src/emqx_ee_connector_influxdb.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 41f7059ec..0a2bbb638 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -389,13 +389,13 @@ to_maps_config(K, V, Res) -> %% Tags & Fields Data Trans parse_batch_data(InstId, BatchData, SyntaxLines) -> {Points, Errors} = lists:foldl( - fun({send_message, Data}, {AccIn, ErrAccIn}) -> + fun({send_message, Data}, {ListOfPoints, ErrAccIn}) -> case data_to_points(Data, SyntaxLines) of {ok, Points} -> - {[Points | AccIn], ErrAccIn}; + {[Points | ListOfPoints], ErrAccIn}; {error, ErrorPoints} -> log_error_points(InstId, ErrorPoints), - {AccIn, ErrAccIn + 1} + {ListOfPoints, ErrAccIn + 1} end end, {[], 0}, @@ -403,7 +403,7 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> ), case Errors of 0 -> - {ok, Points}; + {ok, lists:flatten(Points)}; _ -> ?SLOG(error, #{ msg => io_lib:format("InfluxDB trans point failed, count: ~p", [Errors]), From ed796acb9558ca7ec74b203c7a42903f10d04933 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 22:11:28 +0800 Subject: [PATCH 09/11] fix: fetch resource options after reboot --- apps/emqx_bridge/src/emqx_bridge.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index c794a25a5..08939d915 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -49,14 +49,14 @@ -export([get_basic_usage_info/0]). load() -> - %% set wait_for_resource_ready => 0 to start resources async - Opts = #{auto_retry_interval => 60000, wait_for_resource_ready => 0}, Bridges = emqx:get_config([bridges], #{}), lists:foreach( fun({Type, NamedConf}) -> lists:foreach( fun({Name, Conf}) -> - safe_load_bridge(Type, Name, Conf, Opts) + %% fetch opts for `emqx_resource_worker` + ResOpts = emqx_resource:fetch_creation_opts(Conf), + safe_load_bridge(Type, Name, Conf, ResOpts) end, maps:to_list(NamedConf) ) From 458dab53c5434fe2c9e034e48b100e2c8793608a Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 22:54:32 +0800 Subject: [PATCH 10/11] fix: undefined_functions dialyzer warning --- apps/emqx_bridge/src/emqx_bridge_resource.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 66c4524b0..d34c30ee9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -37,6 +37,7 @@ remove/4, update/2, update/3, + update/4, stop/2, restart/2, reset_metrics/1 @@ -111,6 +112,9 @@ update(BridgeId, {OldConf, Conf}) -> update(BridgeType, BridgeName, {OldConf, Conf}). update(Type, Name, {OldConf, Conf}) -> + update(Type, Name, {OldConf, Conf}, #{}). + +update(Type, Name, {OldConf, Conf}, Opts) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% %% - if the connection related configs like `servers` is updated, we should restart/start @@ -127,7 +131,7 @@ update(Type, Name, {OldConf, Conf}) -> name => Name, config => Conf }), - case recreate(Type, Name, Conf) of + case recreate(Type, Name, Conf, Opts) of {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, not_found} -> @@ -137,7 +141,7 @@ update(Type, Name, {OldConf, Conf}) -> name => Name, config => Conf }), - create(Type, Name, Conf); + create(Type, Name, Conf, Opts); {error, Reason} -> {error, {update_bridge_failed, Reason}} end; From 83746daad529e08131774f795e04b392d6ef5ce8 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 22:59:27 +0800 Subject: [PATCH 11/11] fix: update bridge config badmap error --- apps/emqx_bridge/src/emqx_bridge.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 08939d915..354c4faee 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -260,6 +260,13 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> fun ({_Type, _Name}, _Conf, {error, Reason}) -> {error, Reason}; + %% for emqx_bridge_resource:update/4 + ({Type, Name}, {OldConf, Conf}, _) -> + ResOpts = emqx_resource:fetch_creation_opts(Conf), + case Action(Type, Name, {OldConf, Conf}, ResOpts) of + {error, Reason} -> {error, Reason}; + Return -> Return + end; ({Type, Name}, Conf, _) -> ResOpts = emqx_resource:fetch_creation_opts(Conf), case Action(Type, Name, Conf, ResOpts) of