From e49d3ca50c521ec500a0ea8c7f00710c2845efb1 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 11 Jan 2024 02:15:03 +0800 Subject: [PATCH 1/2] feat: support elasticsearch bridge --- apps/emqx_bridge/src/emqx_action_info.erl | 3 +- apps/emqx_bridge_es/.gitignore | 19 + apps/emqx_bridge_es/BSL.txt | 94 ++++ apps/emqx_bridge_es/README.md | 23 + apps/emqx_bridge_es/docker-ct | 1 + apps/emqx_bridge_es/etc/emqx_bridge_es.conf | 0 .../emqx_bridge_es/include/emqx_bridge_es.hrl | 8 + apps/emqx_bridge_es/rebar.config | 15 + .../emqx_bridge_es/src/emqx_bridge_es.app.src | 23 + apps/emqx_bridge_es/src/emqx_bridge_es.erl | 312 +++++++++++ .../src/emqx_bridge_es_action_info.erl | 22 + .../src/emqx_bridge_es_connector.erl | 498 ++++++++++++++++++ .../src/emqx_bridge_http_connector.erl | 28 +- .../src/emqx_bridge_iotdb_connector.erl | 9 +- .../src/schema/emqx_connector_ee_schema.erl | 18 +- .../src/schema/emqx_connector_schema.erl | 4 +- apps/emqx_machine/priv/reboot_lists.eterm | 1 + apps/emqx_machine/src/emqx_machine.app.src | 2 +- mix.exs | 1 + rel/i18n/emqx_bridge_es.hocon | 129 +++++ rel/i18n/emqx_bridge_es_connector.hocon | 44 ++ 21 files changed, 1234 insertions(+), 20 deletions(-) create mode 100644 apps/emqx_bridge_es/.gitignore create mode 100644 apps/emqx_bridge_es/BSL.txt create mode 100644 apps/emqx_bridge_es/README.md create mode 100644 apps/emqx_bridge_es/docker-ct create mode 100644 apps/emqx_bridge_es/etc/emqx_bridge_es.conf create mode 100644 apps/emqx_bridge_es/include/emqx_bridge_es.hrl create mode 100644 apps/emqx_bridge_es/rebar.config create mode 100644 apps/emqx_bridge_es/src/emqx_bridge_es.app.src create mode 100644 apps/emqx_bridge_es/src/emqx_bridge_es.erl create mode 100644 apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl create mode 100644 apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl create mode 100644 rel/i18n/emqx_bridge_es.hocon create mode 100644 rel/i18n/emqx_bridge_es_connector.hocon diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index ceba8e202..5ce60fe6c 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -87,7 +87,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_syskeeper_action_info, emqx_bridge_timescale_action_info, emqx_bridge_redis_action_info, - emqx_bridge_iotdb_action_info + emqx_bridge_iotdb_action_info, + emqx_bridge_es_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_es/.gitignore b/apps/emqx_bridge_es/.gitignore new file mode 100644 index 000000000..e9bc1c544 --- /dev/null +++ b/apps/emqx_bridge_es/.gitignore @@ -0,0 +1,19 @@ +.rebar3 + _* + .eunit + *.o + *.beam + *.plt + *.swp + *.swo + .erlang.cookie + ebin + log + erl_crash.dump + .rebar + logs + _build + .idea + *.iml + rebar3.crashdump + *~ diff --git a/apps/emqx_bridge_es/BSL.txt b/apps/emqx_bridge_es/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_es/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_es/README.md b/apps/emqx_bridge_es/README.md new file mode 100644 index 000000000..b91af4aba --- /dev/null +++ b/apps/emqx_bridge_es/README.md @@ -0,0 +1,23 @@ +# Apache ElasticSearch Data Integration Bridge + +This application houses the ElasticSearch data integration bridge for EMQX Enterprise + Edition. It provides the means to connect to ElasticSearch and publish messages to it. + +It implements the connection management and interaction without need for a + separate connector app, since it's not used by authentication and authorization + applications. + + + +# Contributing +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_bridge_es/docker-ct b/apps/emqx_bridge_es/docker-ct new file mode 100644 index 000000000..80f0d394b --- /dev/null +++ b/apps/emqx_bridge_es/docker-ct @@ -0,0 +1 @@ +toxiproxy diff --git a/apps/emqx_bridge_es/etc/emqx_bridge_es.conf b/apps/emqx_bridge_es/etc/emqx_bridge_es.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_es/include/emqx_bridge_es.hrl b/apps/emqx_bridge_es/include/emqx_bridge_es.hrl new file mode 100644 index 000000000..8393cff2b --- /dev/null +++ b/apps/emqx_bridge_es/include/emqx_bridge_es.hrl @@ -0,0 +1,8 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_BRIDGE_ES_HRL). +-define(EMQX_BRIDGE_ES_HRL, true). + +-endif. diff --git a/apps/emqx_bridge_es/rebar.config b/apps/emqx_bridge_es/rebar.config new file mode 100644 index 000000000..2a3526e08 --- /dev/null +++ b/apps/emqx_bridge_es/rebar.config @@ -0,0 +1,15 @@ +%% -*- mode: erlang -*- + +{erl_opts, [ + debug_info +]}. + +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}}, + {emqx_bridge_http, {path, "../emqx_bridge_http"}} +]}. +{plugins, [rebar3_path_deps]}. +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src new file mode 100644 index 000000000..9e98cd33e --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src @@ -0,0 +1,23 @@ +%% -*- mode: erlang -*- +{application, emqx_bridge_es, [ + {description, "EMQX Enterprise Elastic Search Bridge"}, + {vsn, "0.1.0"}, + {modules, [ + emqx_bridge_es, + emqx_bridge_es_connector + ]}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_connector + ]}, + {env, []}, + {licenses, ["Business Source License 1.1"]}, + {maintainers, ["EMQX Team "]}, + {links, [ + {"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} +]}. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl new file mode 100644 index 000000000..57ab648b5 --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -0,0 +1,312 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es). + +-include("emqx_bridge_es.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-export([bridge_v2_examples/1]). + +%% hocon_schema API +-export([namespace/0, roots/0, fields/1, desc/1]). + +-define(CONNECTOR_TYPE, elasticsearch). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + +namespace() -> "bridge_elasticsearch". + +roots() -> []. + +fields(action) -> + {elasticsearch, + ?HOCON( + ?MAP(action_name, ?R_REF(action_config)), + #{ + desc => <<"ElasticSearch Action Config">>, + required => false + } + )}; +fields(action_config) -> + emqx_resource_schema:override( + emqx_bridge_v2_schema:make_producer_action_schema( + ?HOCON( + ?R_REF(action_parameters), + #{ + required => true, desc => ?DESC("action_parameters") + } + ) + ), + [ + {resource_opts, + ?HOCON(?R_REF(action_resource_opts), #{ + default => #{}, + desc => ?DESC(emqx_resource_schema, "resource_opts") + })} + ] + ); +fields(action_resource_opts) -> + lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + emqx_bridge_v2_schema:resource_opts_fields() + ); +fields(action_parameters) -> + [ + {target, + ?HOCON( + binary(), + #{ + desc => ?DESC("config_target"), + required => false + } + )}, + {require_alias, + ?HOCON( + boolean(), + #{ + required => false, + default => false, + desc => ?DESC("config_require_alias") + } + )}, + {routing, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_routing") + } + )}, + {wait_for_active_shards, + ?HOCON( + ?UNION([pos_integer(), all]), + #{ + required => false, + desc => ?DESC("config_wait_for_active_shards") + } + )}, + {data, + ?HOCON( + ?ARRAY( + ?UNION( + [ + ?R_REF(create), + ?R_REF(delete), + ?R_REF(index), + ?R_REF(update) + ] + ) + ), + #{ + desc => ?DESC("action_parameters_data") + } + )} + ] ++ + lists:filter( + fun({K, _}) -> + not lists:member(K, [path, method, body, headers, request_timeout]) + end, + emqx_bridge_http_schema:fields("parameters_opts") + ); +fields(Action) when Action =:= create; Action =:= index -> + [ + {action, + ?HOCON( + Action, + #{ + desc => atom_to_binary(Action), + required => true + } + )}, + {'_index', + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_index") + } + )}, + {'_id', + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_id") + } + )}, + {require_alias, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_require_alias") + } + )}, + {fields, + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_fields") + } + )} + ]; +fields(delete) -> + [ + {action, + ?HOCON( + delete, + #{ + desc => <<"Delete">>, + required => true + } + )}, + {'_index', + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_index") + } + )}, + {'_id', + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_id") + } + )}, + {require_alias, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_require_alias") + } + )} + ]; +fields(update) -> + [ + {action, + ?HOCON( + update, + #{ + desc => <<"Update">>, + required => true + } + )}, + {doc_as_upsert, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_doc_as_upsert") + } + )}, + {upsert, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_upsert") + } + )}, + {'_index', + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_index") + } + )}, + {'_id', + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_id") + } + )}, + {require_alias, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_require_alias") + } + )}, + {fields, + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_fields") + } + )} + ]; +fields("post_bridge_v2") -> + emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config); +fields("put_bridge_v2") -> + fields(action_config); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"). + +bridge_v2_examples(Method) -> + [ + #{ + <<"elasticsearch">> => + #{ + summary => <<"Elastic Search Bridge">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + parameters => #{ + target => <<"${target_index}">>, + data => [ + #{ + action => index, + '_index' => <<"${index}">>, + fields => <<"${fields}">>, + require_alias => <<"${require_alias}">> + }, + #{ + action => create, + '_index' => <<"${index}">>, + fields => <<"${fields}">> + }, + #{ + action => delete, + '_index' => <<"${index}">>, + '_id' => <<"${id}">> + }, + #{ + action => update, + '_index' => <<"${index}">>, + '_id' => <<"${id}">>, + fields => <<"${fields}">>, + require_alias => false, + doc_as_upsert => <<"${doc_as_upsert}">>, + upsert => <<"${upsert}">> + } + ] + } + }. + +unsupported_opts() -> + [ + batch_size, + batch_time + ]. + +desc(_) -> undefined. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl new file mode 100644 index 000000000..b2f2ff777 --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es_action_info). + +-behaviour(emqx_action_info). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% behaviour callbacks +-export([ + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +-define(ACTION_TYPE, elasticsearch). + +action_type_name() -> ?ACTION_TYPE. +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> emqx_bridge_es. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl new file mode 100644 index 000000000..22509e037 --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -0,0 +1,498 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es_connector). + +-behaviour(emqx_resource). + +-include("emqx_bridge_es.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_get_status/2, + on_query/3, + on_query_async/4, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1, + connector_examples/1, + connector_example_values/0 +]). + +%% emqx_connector_resource behaviour callbacks +-export([connector_config/2]). + +-type config() :: + #{ + base_url := #{ + scheme := http | https, + host := iolist(), + port := inet:port_number(), + path := _ + }, + connect_timeout := pos_integer(), + pool_type := random | hash, + pool_size := pos_integer(), + request => undefined | map(), + atom() => _ + }. + +-type state() :: + #{ + base_path := _, + connect_timeout := pos_integer(), + pool_type := random | hash, + channels := map(), + request => undefined | map(), + atom() => _ + }. + +-type manager_id() :: binary(). + +-define(CONNECTOR_TYPE, elasticsearch). + +%%------------------------------------------------------------------------------------- +%% connector examples +%%------------------------------------------------------------------------------------- +connector_examples(Method) -> + [ + #{ + <<"elasticsearch">> => + #{ + summary => <<"Elastic Search Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"elasticsearch_connector">>, + type => elasticsearch, + enable => true, + authentication => #{ + <<"username">> => <<"root">>, + <<"password">> => <<"******">> + }, + base_url => <<"http://127.0.0.1:9200/">>, + connect_timeout => <<"15s">>, + pool_type => <<"random">>, + pool_size => 8, + enable_pipelining => 100, + ssl => #{enable => false} + }. + +%%------------------------------------------------------------------------------------- +%% schema +%%------------------------------------------------------------------------------------- +namespace() -> "elasticsearch". + +roots() -> + [{config, #{type => ?R_REF(config)}}]. + +fields(config) -> + lists:filter( + fun({K, _}) -> not lists:member(K, [url, request, retry_interval, headers]) end, + emqx_bridge_http_schema:fields("config_connector") + ) ++ + fields("connection_fields"); +fields("connection_fields") -> + [ + {base_url, + ?HOCON( + emqx_schema:url(), + #{ + required => true, + desc => ?DESC(emqx_bridge_es, "config_base_url") + } + )}, + {authentication, + ?HOCON( + ?UNION([?R_REF(auth_basic)]), + #{ + desc => ?DESC("config_authentication") + } + )} + ]; +fields(auth_basic) -> + [ + {username, + ?HOCON(binary(), #{ + required => true, + desc => ?DESC("config_auth_basic_username") + })}, + {password, + emqx_schema_secret:mk(#{ + required => true, + desc => ?DESC("config_auth_basic_password") + })} + ]; +fields("post") -> + emqx_connector_schema:type_and_name_fields(elasticsearch) ++ fields(config); +fields("put") -> + fields(config); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc(auth_basic) -> + "Basic Authentication"; +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Elastic Search using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) -> + #{ + base_url := BaseUrl, + authentication := + #{ + username := Username, + password := Password0 + } + } = Conf, + + Password = emqx_secret:unwrap(Password0), + Base64 = base64:encode(<>), + BasicToken = <<"Basic ", Base64/binary>>, + + WebhookConfig = + Conf#{ + method => <<"post">>, + url => BaseUrl, + headers => [ + {<<"Content-type">>, <<"application/json">>}, + {<<"Authorization">>, BasicToken} + ] + }, + ParseConfs( + <<"http">>, + Name, + WebhookConfig + ). + +%%------------------------------------------------------------------------------------- +%% `emqx_resource' API +%%------------------------------------------------------------------------------------- +callback_mode() -> async_if_possible. + +-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). +on_start(InstanceId, Config) -> + case emqx_bridge_http_connector:on_start(InstanceId, Config) of + {ok, State} -> + ?SLOG(info, #{ + msg => "elasticsearch_bridge_started", + instance_id => InstanceId, + request => maps:get(request, State, <<>>) + }), + ?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}), + {ok, State#{channels => #{}}}; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_start_elasticsearch_bridge", + instance_id => InstanceId, + request => maps:get(request, Config, <<>>), + reason => Reason + }), + throw(failed_to_start_elasticsearch_bridge) + end. + +-spec on_stop(manager_id(), state()) -> ok | {error, term()}. +on_stop(InstanceId, State) -> + ?SLOG(info, #{ + msg => "stopping_elasticsearch_bridge", + connector => InstanceId + }), + Res = emqx_bridge_http_connector:on_stop(InstanceId, State), + ?tp(elasticsearch_bridge_stopped, #{instance_id => InstanceId}), + Res. + +-spec on_get_status(manager_id(), state()) -> + {connected, state()} | {disconnected, state(), term()}. +on_get_status(InstanceId, State) -> + emqx_bridge_http_connector:on_get_status(InstanceId, State). + +-spec on_query(manager_id(), tuple(), state()) -> + {ok, pos_integer(), [term()], term()} + | {ok, pos_integer(), [term()]} + | {error, term()}. +on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) -> + ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}), + ?SLOG(debug, #{ + msg => "elasticsearch_bridge_on_query_called", + instance_id => InstanceId, + send_message => Req, + state => emqx_utils:redact(State) + }), + case try_render_message(Req, Channels) of + {ok, Body} -> + handle_response( + emqx_bridge_http_connector:on_query( + InstanceId, {ChannelId, {Msg, Body}}, State + ) + ); + Error -> + Error + end. + +-spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) -> + {ok, pid()} | {error, empty_request}. +on_query_async( + InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, #{channels := Channels} = State +) -> + ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}), + ?SLOG(debug, #{ + msg => "elasticsearch_bridge_on_query_async_called", + instance_id => InstanceId, + send_message => Req, + state => emqx_utils:redact(State) + }), + case try_render_message(Req, Channels) of + {ok, Payload} -> + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_bridge_http_connector:on_query_async( + InstanceId, {ChannelId, {Msg, Payload}}, ReplyFunAndArgs, State + ); + Error -> + Error + end. + +on_add_channel( + InstanceId, + #{channels := Channels} = State0, + ChannelId, + #{parameters := Parameter} +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + #{data := Data} = Parameter, + Parameter1 = Parameter#{path => path(Parameter), method => <<"post">>}, + {ok, State} = emqx_bridge_http_connector:on_add_channel( + InstanceId, State0, ChannelId, #{parameters => Parameter1} + ), + case preproc_data_template(Data) of + [] -> + {error, invalid_data}; + DataTemplate -> + Channel = Parameter1#{data => DataTemplate}, + Channels2 = Channels#{ChannelId => Channel}, + {ok, State#{channels => Channels2}} + end + end. + +on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> + {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId), + Channels2 = maps:remove(ChannelId, Channels), + {ok, OldState#{channels => Channels2}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> + connected; + _ -> + {error, not_exists} + end. + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- +path(Param) -> + Target = maps:get(target, Param, undefined), + QString0 = maps:fold( + fun(K, V, Acc) -> + [[atom_to_list(K), "=", to_str(V)] | Acc] + end, + [["_source=false"], ["filter_path=items.*.error"]], + maps:with([require_alias, routing, wait_for_active_shards], Param) + ), + QString = "?" ++ lists:join("&", QString0), + target(Target) ++ QString. + +target(undefined) -> "/_bulk"; +target(Str) -> "/" ++ binary_to_list(Str) ++ "/_bulk". + +to_str(List) when is_list(List) -> List; +to_str(false) -> "false"; +to_str(true) -> "true"; +to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). + +proc_data(DataList, Msg) when is_list(DataList) -> + [ + begin + proc_data(Data, Msg) + end + || Data <- DataList + ]; +proc_data( + #{ + action := Action, + '_index' := IndexT, + '_id' := IdT, + require_alias := RequiredAliasT, + fields := FieldsT + }, + Msg +) when Action =:= create; Action =:= index -> + [ + emqx_utils_json:encode( + #{ + Action => filter([ + {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, + {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, + {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} + ]) + } + ), + "\n", + emqx_placeholder:proc_tmpl(FieldsT, Msg), + "\n" + ]; +proc_data( + #{ + action := delete, + '_index' := IndexT, + '_id' := IdT, + require_alias := RequiredAliasT + }, + Msg +) -> + [ + emqx_utils_json:encode( + #{ + delete => filter([ + {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, + {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, + {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} + ]) + } + ), + "\n" + ]; +proc_data( + #{ + action := update, + '_index' := IndexT, + '_id' := IdT, + require_alias := RequiredAliasT, + doc_as_upsert := DocAsUpsert, + upsert := Upsert, + fields := FieldsT + }, + Msg +) -> + [ + emqx_utils_json:encode( + #{ + update => filter([ + {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, + {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, + {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}, + {doc_as_upsert, emqx_placeholder:proc_tmpl(DocAsUpsert, Msg)}, + {upsert, emqx_placeholder:proc_tmpl(Upsert, Msg)} + ]) + } + ), + "\n{\"doc\":", + emqx_placeholder:proc_tmpl(FieldsT, Msg), + "}\n" + ]. + +filter(List) -> + Fun = fun + ({_K, V}) when V =:= undefined; V =:= <<"undefined">>; V =:= "undefined" -> + false; + ({_K, V}) when V =:= ""; V =:= <<>> -> + false; + ({_K, V}) when V =:= "false" -> {true, false}; + ({_K, V}) when V =:= "true" -> {true, true}; + ({_K, _V}) -> + true + end, + maps:from_list(lists:filtermap(Fun, List)). + +handle_response({ok, 200, _Headers, Body} = Resp) -> + eval_response_body(Body, Resp); +handle_response({ok, 200, Body} = Resp) -> + eval_response_body(Body, Resp); +handle_response({ok, Code, _Headers, Body}) -> + {error, #{code => Code, body => Body}}; +handle_response({ok, Code, Body}) -> + {error, #{code => Code, body => Body}}; +handle_response({error, _} = Error) -> + Error. + +eval_response_body(<<"{}">>, Resp) -> Resp; +eval_response_body(Body, _Resp) -> {error, emqx_utils_json:decode(Body)}. + +preproc_data_template(DataList) when is_list(DataList) -> + [ + begin + preproc_data_template(Data) + end + || Data <- DataList + ]; +preproc_data_template(#{action := create} = Data) -> + Index = maps:get('_index', Data, ""), + Id = maps:get('_id', Data, ""), + RequiredAlias = maps:get(require_alias, Data, ""), + Fields = maps:get(fields, Data, ""), + #{ + action => create, + '_index' => emqx_placeholder:preproc_tmpl(Index), + '_id' => emqx_placeholder:preproc_tmpl(Id), + require_alias => emqx_placeholder:preproc_tmpl(RequiredAlias), + fields => emqx_placeholder:preproc_tmpl(Fields) + }; +preproc_data_template(#{action := index} = Data) -> + Data1 = preproc_data_template(Data#{action => create}), + Data1#{action => index}; +preproc_data_template(#{action := delete} = Data) -> + Data1 = preproc_data_template(Data#{action => create}), + Data2 = Data1#{action => delete}, + maps:remove(fields, Data2); +preproc_data_template(#{action := update} = Data) -> + Data1 = preproc_data_template(Data#{action => index}), + DocAsUpsert = maps:get(doc_as_upsert, Data, ""), + Upsert = maps:get(upsert, Data, ""), + Data1#{ + action => update, + doc_as_upsert => emqx_placeholder:preproc_tmpl(DocAsUpsert), + upsert => emqx_placeholder:preproc_tmpl(Upsert) + }. + +try_render_message({ChannelId, Msg}, Channels) -> + case maps:find(ChannelId, Channels) of + {ok, #{data := Data}} -> + {ok, proc_data(Data, Msg)}; + _ -> + {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} + end. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 51375fc04..f00ae8523 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -317,7 +317,7 @@ on_query(InstId, {send_message, Msg}, State) -> %% BridgeV2 entrypoint on_query( InstId, - {ActionId, Msg}, + {ActionId, MsgAndBody}, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of @@ -334,10 +334,10 @@ on_query( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, Msg), + } = process_request_and_action(Request, ActionState, MsgAndBody), %% bridge buffer worker has retry, do not let ehttpc retry Retry = 2, - ClientId = maps:get(clientid, Msg, undefined), + ClientId = clientid(MsgAndBody), on_query( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, @@ -430,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> %% BridgeV2 entrypoint on_query_async( InstId, - {ActionId, Msg}, + {ActionId, MsgAndBody}, ReplyFunAndArgs, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> @@ -448,8 +448,8 @@ on_query_async( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, Msg), - ClientId = maps:get(clientid, Msg, undefined), + } = process_request_and_action(Request, ActionState, MsgAndBody), + ClientId = clientid(MsgAndBody), on_query_async( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout}, @@ -629,12 +629,9 @@ maybe_parse_template(Key, Conf) -> parse_template(String) -> emqx_template:parse(String). -process_request_and_action(Request, ActionState, Msg) -> +process_request_and_action(Request, ActionState, {Msg, Body}) -> MethodTemplate = maps:get(method, ActionState), Method = make_method(render_template_string(MethodTemplate, Msg)), - BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), - PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)), @@ -656,7 +653,11 @@ process_request_and_action(Request, ActionState, Msg) -> body => Body, headers => Headers, request_timeout => maps:get(request_timeout, ActionState) - }. + }; +process_request_and_action(Request, ActionState, Msg) -> + BodyTemplate = maps:get(body, ActionState), + Body = render_request_body(BodyTemplate, Msg), + process_request_and_action(Request, ActionState, {Msg, Body}). merge_proplist(Proplist1, Proplist2) -> lists:foldl( @@ -732,7 +733,7 @@ formalize_request(_Method, BasePath, {Path, Headers}) -> %% because an HTTP server may handle paths like %% "/a/b/c/", "/a/b/c" and "/a//b/c" differently. %% -%% So we try to avoid unneccessary path normalization. +%% So we try to avoid unnecessary path normalization. %% %% See also: `join_paths_test_/0` join_paths(Path1, Path2) -> @@ -876,6 +877,9 @@ redact_request({Path, Headers}) -> redact_request({Path, Headers, _Body}) -> {Path, Headers, <<"******">>}. +clientid({Msg, _Body}) -> clientid(Msg); +clientid(Msg) -> maps:get(clientid, Msg, undefined). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 4286a59e4..7fbcfc6db 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -90,7 +90,7 @@ connector_example_values() -> enable => true, authentication => #{ <<"username">> => <<"root">>, - <<"password">> => <<"*****">> + <<"password">> => <<"******">> }, base_url => <<"http://iotdb.local:18080/">>, connect_timeout => <<"15s">>, @@ -109,7 +109,10 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++ + proplists_without( + [url, request, retry_interval, headers], + emqx_bridge_http_schema:fields("config_connector") + ) ++ fields("connection_fields"); fields("connection_fields") -> [ @@ -206,7 +209,7 @@ on_start(InstanceId, Config) -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", instance_id => InstanceId, - base_url => maps:get(request, Config, <<>>), + request => maps:get(request, Config, <<>>), reason => Reason }), throw(failed_to_start_iotdb_bridge) diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 9e35015b0..822e8429b 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -50,6 +50,8 @@ resource_type(redis) -> emqx_bridge_redis_connector; resource_type(iotdb) -> emqx_bridge_iotdb_connector; +resource_type(elasticsearch) -> + emqx_bridge_es_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -62,6 +64,8 @@ connector_impl_module(confluent_producer) -> emqx_bridge_confluent_producer; connector_impl_module(iotdb) -> emqx_bridge_iotdb_connector; +connector_impl_module(elasticsearch) -> + emqx_bridge_es_connector; connector_impl_module(_ConnectorType) -> undefined. @@ -181,6 +185,14 @@ connector_structs() -> desc => <<"IoTDB Connector Config">>, required => false } + )}, + {elasticsearch, + mk( + hoconsc:map(name, ref(emqx_bridge_es_connector, config)), + #{ + desc => <<"Elastis Search Connector Config">>, + required => false + } )} ]. @@ -199,7 +211,8 @@ schema_modules() -> emqx_bridge_timescale, emqx_postgresql_connector_schema, emqx_bridge_redis_schema, - emqx_bridge_iotdb_connector + emqx_bridge_iotdb_connector, + emqx_bridge_es_connector ]. api_schemas(Method) -> @@ -227,7 +240,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), - api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method) + api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), + api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 0a3c9d744..b043ebacd 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -147,7 +147,9 @@ connector_type_to_bridge_types(syskeeper_proxy) -> connector_type_to_bridge_types(timescale) -> [timescale]; connector_type_to_bridge_types(iotdb) -> - [iotdb]. + [iotdb]; +connector_type_to_bridge_types(elasticsearch) -> + [elasticsearch]. actions_config_name() -> <<"actions">>. diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index cf3ad1523..f7e78c360 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -99,6 +99,7 @@ emqx_bridge_hstreamdb, emqx_bridge_influxdb, emqx_bridge_iotdb, + emqx_bridge_es, emqx_bridge_matrix, emqx_bridge_mongodb, emqx_bridge_mysql, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 6d7012313..320ddea02 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.2.17"}, + {vsn, "0.2.18"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/mix.exs b/mix.exs index de8195cfb..66889cebf 100644 --- a/mix.exs +++ b/mix.exs @@ -164,6 +164,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_hstreamdb, :emqx_bridge_influxdb, :emqx_bridge_iotdb, + :emqx_bridge_es, :emqx_bridge_matrix, :emqx_bridge_mongodb, :emqx_bridge_mysql, diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon new file mode 100644 index 000000000..78299c4ee --- /dev/null +++ b/rel/i18n/emqx_bridge_es.hocon @@ -0,0 +1,129 @@ +emqx_bridge_es { + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the IoTDB REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the IoTDB REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_base_url.desc: +"""The base URL of the external ElasticSearch service's REST interface.""" +config_base_url.label: +"""ElasticSearch REST Service Base URL""" + +config_target.desc: +"""Name of the data stream, index, or index alias to perform bulk actions on""" + +config_target.label: +"""Target""" + +config_require_alias.desc: +"""If true, the request’s actions must target an index alias. Defaults to false""" +config_require_alias.label: +"""Require Alias""" + +config_routing.desc: +"""Custom value used to route operations to a specific shard.""" +config_routing.label: +"""Routing""" + +config_wait_for_active_shards.desc: +"""The number of shard copies that must be active before proceeding with the operation. +Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1). +Default: 1, the primary shard""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +desc_config.desc: +"""Configuration for Apache IoTDB bridge.""" + +desc_config.label: +"""IoTDB Bridge Configuration""" + +desc_name.desc: +"""Bridge name, used as a human-readable description of the bridge.""" + +desc_name.label: +"""Bridge Name""" + +config_parameters_action.desc: +"""TODO""" + +config_parameters_action.label: +"""Action""" + +config_parameters_index.desc: +"""Name of the data stream, index, or index alias to perform the action on. +This parameter is required if a is not specified in the request path.""" + +config_parameters_index.label: +"""_index""" + +config_parameters_id.desc: +"""The document ID. If no ID is specified, a document ID is automatically generated.""" +config_parameters_id.label: +"""_id""" + +config_parameters_require_alias.desc: +"""If true, the action must target an index alias. Defaults to false.""" +config_parameters_require_alias.label: +"""_require_alias""" + +config_parameters_fields.desc: +"""The document source to index. Required for create and index operations.""" +config_parameters_fields.label: +"""fields""" + +config_parameters_doc_as_upsert.desc: +"""Instead of sending a partial doc plus an upsert doc, you can set doc_as_upsert to true +to use the contents of doc as the upsert value.""" +config_parameters_doc_as_upsert.label: +"""doc_as_upsert""" + +config_parameters_upsert.desc: +"""If the document does not already exist, the contents of the upsert element are inserted as a new document.""" +config_parameters_upsert.label: +"""upsert""" + + +action_parameters_data.desc: +"""ElasticSearch action parameter data""" + +action_parameters_data.label: +"""Parameter Data""" + +action_parameters.desc: +"""ElasticSearch action parameters""" + +action_parameters.label: +"""Parameters""" + +} diff --git a/rel/i18n/emqx_bridge_es_connector.hocon b/rel/i18n/emqx_bridge_es_connector.hocon new file mode 100644 index 000000000..f980b3aca --- /dev/null +++ b/rel/i18n/emqx_bridge_es_connector.hocon @@ -0,0 +1,44 @@ +emqx_bridge_es_connector { + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the ElasticSearch REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the ElasticSearch REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_base_url.desc: +"""The base URL of the external ElasticSearch service's REST interface.""" +config_base_url.label: +"""ElasticSearch REST Service Base URL""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +desc_config.desc: +"""Configuration for ElasticSearch bridge.""" + +desc_config.label: +"""ElasticSearch Bridge Configuration""" + +} From ace443fc18263853aed8773339b492e8e552a032 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 12 Jan 2024 14:15:30 +0800 Subject: [PATCH 2/2] refactor: refactor es's action --- .../src/schema/emqx_bridge_v2_schema.erl | 15 +- apps/emqx_bridge_es/.gitignore | 19 - apps/emqx_bridge_es/src/emqx_bridge_es.erl | 361 +++++++----------- .../src/emqx_bridge_es_connector.erl | 263 ++++--------- .../src/emqx_bridge_http_connector.erl | 23 +- .../src/schema/emqx_connector_ee_schema.erl | 2 +- rel/i18n/emqx_bridge_es.hocon | 75 ++-- rel/i18n/emqx_bridge_es_connector.hocon | 5 - scripts/spellcheck/dicts/emqx.txt | 2 + 9 files changed, 275 insertions(+), 490 deletions(-) delete mode 100644 apps/emqx_bridge_es/.gitignore diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 74239ffc0..aa06b547d 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -361,11 +361,16 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> [] -> false; _ -> - {true, #{ - schema_module => Module, - type_name => TypeName, - missing_fields => MissingFields - }} + %% elasticsearch is new and doesn't have local_topic + case MissingFields of + [local_topic] when Module =:= emqx_bridge_es -> false; + _ -> + {true, #{ + schema_module => Module, + type_name => TypeName, + missing_fields => MissingFields + }} + end end. -endif. diff --git a/apps/emqx_bridge_es/.gitignore b/apps/emqx_bridge_es/.gitignore deleted file mode 100644 index e9bc1c544..000000000 --- a/apps/emqx_bridge_es/.gitignore +++ /dev/null @@ -1,19 +0,0 @@ -.rebar3 - _* - .eunit - *.o - *.beam - *.plt - *.swp - *.swo - .erlang.cookie - ebin - log - erl_crash.dump - .rebar - logs - _build - .idea - *.iml - rebar3.crashdump - *~ diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index 57ab648b5..b575f32ed 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -25,15 +25,15 @@ fields(action) -> ?HOCON( ?MAP(action_name, ?R_REF(action_config)), #{ - desc => <<"ElasticSearch Action Config">>, - required => false + required => false, + desc => ?DESC(elasticsearch) } )}; fields(action_config) -> emqx_resource_schema:override( - emqx_bridge_v2_schema:make_producer_action_schema( + emqx_bridge_v2_schema:make_consumer_action_schema( ?HOCON( - ?R_REF(action_parameters), + ?UNION(fun action_union_member_selector/1), #{ required => true, desc => ?DESC("action_parameters") } @@ -54,200 +54,28 @@ fields(action_resource_opts) -> end, emqx_bridge_v2_schema:resource_opts_fields() ); -fields(action_parameters) -> +fields(action_create) -> [ - {target, - ?HOCON( - binary(), - #{ - desc => ?DESC("config_target"), - required => false - } - )}, - {require_alias, - ?HOCON( - boolean(), - #{ - required => false, - default => false, - desc => ?DESC("config_require_alias") - } - )}, - {routing, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_routing") - } - )}, - {wait_for_active_shards, - ?HOCON( - ?UNION([pos_integer(), all]), - #{ - required => false, - desc => ?DESC("config_wait_for_active_shards") - } - )}, - {data, - ?HOCON( - ?ARRAY( - ?UNION( - [ - ?R_REF(create), - ?R_REF(delete), - ?R_REF(index), - ?R_REF(update) - ] - ) - ), - #{ - desc => ?DESC("action_parameters_data") - } - )} - ] ++ - lists:filter( - fun({K, _}) -> - not lists:member(K, [path, method, body, headers, request_timeout]) - end, - emqx_bridge_http_schema:fields("parameters_opts") - ); -fields(Action) when Action =:= create; Action =:= index -> - [ - {action, - ?HOCON( - Action, - #{ - desc => atom_to_binary(Action), - required => true - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )}, - {fields, - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_fields") - } - )} + action(create), + index(), + id(false), + doc(true), + routing(), + require_alias(), + overwrite() + | http_common_opts() ]; -fields(delete) -> +fields(action_delete) -> + [action(delete), index(), id(true), routing() | http_common_opts()]; +fields(action_update) -> [ - {action, - ?HOCON( - delete, - #{ - desc => <<"Delete">>, - required => true - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )} - ]; -fields(update) -> - [ - {action, - ?HOCON( - update, - #{ - desc => <<"Update">>, - required => true - } - )}, - {doc_as_upsert, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_doc_as_upsert") - } - )}, - {upsert, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_upsert") - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )}, - {fields, - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_fields") - } - )} + action(update), + index(), + id(true), + doc(true), + routing(), + require_alias() + | http_common_opts() ]; fields("post_bridge_v2") -> emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config); @@ -256,6 +84,111 @@ fields("put_bridge_v2") -> fields("get_bridge_v2") -> emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"). +action_union_member_selector(all_union_members) -> + [ + ?R_REF(action_create), + ?R_REF(action_delete), + ?R_REF(action_update) + ]; +action_union_member_selector({value, Value}) -> + case Value of + #{<<"action">> := <<"create">>} -> + [?R_REF(action_create)]; + #{<<"action">> := <<"delete">>} -> + [?R_REF(action_delete)]; + #{<<"action">> := <<"update">>} -> + [?R_REF(action_update)]; + _ -> + Expected = "create | delete | update", + throw(#{ + field_name => action, + expected => Expected + }) + end. + +action(Action) -> + {action, + ?HOCON( + Action, + #{ + required => true, + desc => atom_to_binary(Action) + } + )}. + +overwrite() -> + {overwrite, + ?HOCON( + boolean(), + #{ + required => false, + default => true, + desc => ?DESC("config_overwrite") + } + )}. + +index() -> + {index, + ?HOCON( + binary(), + #{ + required => true, + example => <<"${payload.index}">>, + desc => ?DESC("config_parameters_index") + } + )}. + +id(Required) -> + {id, + ?HOCON( + binary(), + #{ + required => Required, + example => <<"${payload.id}">>, + desc => ?DESC("config_parameters_id") + } + )}. + +doc(Required) -> + {doc, + ?HOCON( + binary(), + #{ + required => Required, + example => <<"${payload.doc}">>, + desc => ?DESC("config_parameters_doc") + } + )}. + +http_common_opts() -> + lists:filter( + fun({K, _}) -> + not lists:member(K, [path, method, body, headers, request_timeout]) + end, + emqx_bridge_http_schema:fields("parameters_opts") + ). + +routing() -> + {routing, + ?HOCON( + binary(), + #{ + required => false, + example => <<"${payload.routing}">>, + desc => ?DESC("config_routing") + } + )}. + +require_alias() -> + {require_alias, + ?HOCON( + boolean(), + #{ + required => false, + desc => ?DESC("config_require_alias") + } + )}. + bridge_v2_examples(Method) -> [ #{ @@ -272,34 +205,10 @@ bridge_v2_examples(Method) -> action_values() -> #{ parameters => #{ - target => <<"${target_index}">>, - data => [ - #{ - action => index, - '_index' => <<"${index}">>, - fields => <<"${fields}">>, - require_alias => <<"${require_alias}">> - }, - #{ - action => create, - '_index' => <<"${index}">>, - fields => <<"${fields}">> - }, - #{ - action => delete, - '_index' => <<"${index}">>, - '_id' => <<"${id}">> - }, - #{ - action => update, - '_index' => <<"${index}">>, - '_id' => <<"${id}">>, - fields => <<"${fields}">>, - require_alias => false, - doc_as_upsert => <<"${doc_as_upsert}">>, - upsert => <<"${upsert}">> - } - ] + action => create, + index => <<"${payload.index}">>, + overwrite => true, + doc => <<"${payload.doc}">> } }. @@ -309,4 +218,10 @@ unsupported_opts() -> batch_time ]. +desc(elasticsearch) -> ?DESC(elasticsearch); +desc(action_config) -> ?DESC(action_config); +desc(action_create) -> ?DESC(action_create); +desc(action_delete) -> ?DESC(action_delete); +desc(action_update) -> ?DESC(action_update); +desc(action_resource_opts) -> ?DESC(action_resource_opts); desc(_) -> undefined. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 22509e037..fe86eac56 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -233,7 +233,7 @@ on_get_status(InstanceId, State) -> {ok, pos_integer(), [term()], term()} | {ok, pos_integer(), [term()]} | {error, term()}. -on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) -> +on_query(InstanceId, {ChannelId, Msg} = Req, State) -> ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "elasticsearch_bridge_on_query_called", @@ -241,21 +241,16 @@ on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) -> send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of - {ok, Body} -> - handle_response( - emqx_bridge_http_connector:on_query( - InstanceId, {ChannelId, {Msg, Body}}, State - ) - ); - Error -> - Error - end. + handle_response( + emqx_bridge_http_connector:on_query( + InstanceId, {ChannelId, Msg}, State + ) + ). -spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) -> {ok, pid()} | {error, empty_request}. on_query_async( - InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, #{channels := Channels} = State + InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, State ) -> ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}), ?SLOG(debug, #{ @@ -264,22 +259,17 @@ on_query_async( send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of - {ok, Payload} -> - ReplyFunAndArgs = - { - fun(Result) -> - Response = handle_response(Result), - emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) - end, - [] - }, - emqx_bridge_http_connector:on_query_async( - InstanceId, {ChannelId, {Msg, Payload}}, ReplyFunAndArgs, State - ); - Error -> - Error - end. + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_bridge_http_connector:on_query_async( + InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State + ). on_add_channel( InstanceId, @@ -291,19 +281,17 @@ on_add_channel( true -> {error, already_exists}; _ -> - #{data := Data} = Parameter, - Parameter1 = Parameter#{path => path(Parameter), method => <<"post">>}, + Parameter1 = Parameter#{ + path => path(Parameter), + method => method(Parameter), + body => get_body_template(Parameter) + }, {ok, State} = emqx_bridge_http_connector:on_add_channel( InstanceId, State0, ChannelId, #{parameters => Parameter1} ), - case preproc_data_template(Data) of - [] -> - {error, invalid_data}; - DataTemplate -> - Channel = Parameter1#{data => DataTemplate}, - Channels2 = Channels#{ChannelId => Channel}, - {ok, State#{channels => Channels2}} - end + Channel = Parameter1, + Channels2 = Channels#{ChannelId => Channel}, + {ok, State#{channels => Channels2}} end. on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> @@ -325,124 +313,55 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- -path(Param) -> - Target = maps:get(target, Param, undefined), - QString0 = maps:fold( - fun(K, V, Acc) -> - [[atom_to_list(K), "=", to_str(V)] | Acc] +%% delete DELETE //_doc/<_id> +path(#{action := delete, id := Id, index := Index} = Action) -> + BasePath = ["/", Index, "/_doc/", Id], + Qs = add_query_string([routing], Action), + BasePath ++ Qs; +%% update POST //_update/<_id> +path(#{action := update, id := Id, index := Index} = Action) -> + BasePath = ["/", Index, "/_update/", Id], + Qs = add_query_string([routing, require_alias], Action), + BasePath ++ Qs; +%% create with id //_doc/_id +path(#{action := create, index := Index, id := Id} = Action) -> + BasePath = ["/", Index, "/_doc/", Id], + Qs = + case maps:get(overwrite, Action, true) of + true -> + add_query_string([routing, require_alias], Action); + false -> + Action1 = Action#{op_type => "create"}, + add_query_string([routing, require_alias, op_type], Action1) end, - [["_source=false"], ["filter_path=items.*.error"]], - maps:with([require_alias, routing, wait_for_active_shards], Param) - ), - QString = "?" ++ lists:join("&", QString0), - target(Target) ++ QString. + BasePath ++ Qs; +%% create without id POST //_doc/ +path(#{action := create, index := Index} = Action) -> + BasePath = ["/", Index, "/_doc/"], + Qs = add_query_string([routing, require_alias], Action), + BasePath ++ Qs. -target(undefined) -> "/_bulk"; -target(Str) -> "/" ++ binary_to_list(Str) ++ "/_bulk". +method(#{action := create}) -> <<"POST">>; +method(#{action := delete}) -> <<"DELETE">>; +method(#{action := update}) -> <<"POST">>. + +add_query_string(Keys, Param0) -> + Param1 = maps:with(Keys, Param0), + FoldFun = fun(K, V, Acc) -> [[atom_to_list(K), "=", to_str(V)] | Acc] end, + case maps:fold(FoldFun, [], Param1) of + "" -> ""; + QString -> "?" ++ lists:join("&", QString) + end. to_str(List) when is_list(List) -> List; to_str(false) -> "false"; to_str(true) -> "true"; to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). -proc_data(DataList, Msg) when is_list(DataList) -> - [ - begin - proc_data(Data, Msg) - end - || Data <- DataList - ]; -proc_data( - #{ - action := Action, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT, - fields := FieldsT - }, - Msg -) when Action =:= create; Action =:= index -> - [ - emqx_utils_json:encode( - #{ - Action => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} - ]) - } - ), - "\n", - emqx_placeholder:proc_tmpl(FieldsT, Msg), - "\n" - ]; -proc_data( - #{ - action := delete, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT - }, - Msg -) -> - [ - emqx_utils_json:encode( - #{ - delete => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} - ]) - } - ), - "\n" - ]; -proc_data( - #{ - action := update, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT, - doc_as_upsert := DocAsUpsert, - upsert := Upsert, - fields := FieldsT - }, - Msg -) -> - [ - emqx_utils_json:encode( - #{ - update => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}, - {doc_as_upsert, emqx_placeholder:proc_tmpl(DocAsUpsert, Msg)}, - {upsert, emqx_placeholder:proc_tmpl(Upsert, Msg)} - ]) - } - ), - "\n{\"doc\":", - emqx_placeholder:proc_tmpl(FieldsT, Msg), - "}\n" - ]. - -filter(List) -> - Fun = fun - ({_K, V}) when V =:= undefined; V =:= <<"undefined">>; V =:= "undefined" -> - false; - ({_K, V}) when V =:= ""; V =:= <<>> -> - false; - ({_K, V}) when V =:= "false" -> {true, false}; - ({_K, V}) when V =:= "true" -> {true, true}; - ({_K, _V}) -> - true - end, - maps:from_list(lists:filtermap(Fun, List)). - -handle_response({ok, 200, _Headers, Body} = Resp) -> - eval_response_body(Body, Resp); -handle_response({ok, 200, Body} = Resp) -> - eval_response_body(Body, Resp); +handle_response({ok, Code, _Headers, _Body} = Resp) when Code =:= 200; Code =:= 201 -> + Resp; +handle_response({ok, Code, _Body} = Resp) when Code =:= 200; Code =:= 201 -> + Resp; handle_response({ok, Code, _Headers, Body}) -> {error, #{code => Code, body => Body}}; handle_response({ok, Code, Body}) -> @@ -450,49 +369,5 @@ handle_response({ok, Code, Body}) -> handle_response({error, _} = Error) -> Error. -eval_response_body(<<"{}">>, Resp) -> Resp; -eval_response_body(Body, _Resp) -> {error, emqx_utils_json:decode(Body)}. - -preproc_data_template(DataList) when is_list(DataList) -> - [ - begin - preproc_data_template(Data) - end - || Data <- DataList - ]; -preproc_data_template(#{action := create} = Data) -> - Index = maps:get('_index', Data, ""), - Id = maps:get('_id', Data, ""), - RequiredAlias = maps:get(require_alias, Data, ""), - Fields = maps:get(fields, Data, ""), - #{ - action => create, - '_index' => emqx_placeholder:preproc_tmpl(Index), - '_id' => emqx_placeholder:preproc_tmpl(Id), - require_alias => emqx_placeholder:preproc_tmpl(RequiredAlias), - fields => emqx_placeholder:preproc_tmpl(Fields) - }; -preproc_data_template(#{action := index} = Data) -> - Data1 = preproc_data_template(Data#{action => create}), - Data1#{action => index}; -preproc_data_template(#{action := delete} = Data) -> - Data1 = preproc_data_template(Data#{action => create}), - Data2 = Data1#{action => delete}, - maps:remove(fields, Data2); -preproc_data_template(#{action := update} = Data) -> - Data1 = preproc_data_template(Data#{action => index}), - DocAsUpsert = maps:get(doc_as_upsert, Data, ""), - Upsert = maps:get(upsert, Data, ""), - Data1#{ - action => update, - doc_as_upsert => emqx_placeholder:preproc_tmpl(DocAsUpsert), - upsert => emqx_placeholder:preproc_tmpl(Upsert) - }. - -try_render_message({ChannelId, Msg}, Channels) -> - case maps:find(ChannelId, Channels) of - {ok, #{data := Data}} -> - {ok, proc_data(Data, Msg)}; - _ -> - {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} - end. +get_body_template(#{doc := Doc}) -> Doc; +get_body_template(_) -> undefined. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index f00ae8523..8f54694e9 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -317,7 +317,7 @@ on_query(InstId, {send_message, Msg}, State) -> %% BridgeV2 entrypoint on_query( InstId, - {ActionId, MsgAndBody}, + {ActionId, Msg}, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of @@ -334,10 +334,10 @@ on_query( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, MsgAndBody), + } = process_request_and_action(Request, ActionState, Msg), %% bridge buffer worker has retry, do not let ehttpc retry Retry = 2, - ClientId = clientid(MsgAndBody), + ClientId = clientid(Msg), on_query( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, @@ -430,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> %% BridgeV2 entrypoint on_query_async( InstId, - {ActionId, MsgAndBody}, + {ActionId, Msg}, ReplyFunAndArgs, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> @@ -448,8 +448,8 @@ on_query_async( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, MsgAndBody), - ClientId = clientid(MsgAndBody), + } = process_request_and_action(Request, ActionState, Msg), + ClientId = clientid(Msg), on_query_async( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout}, @@ -629,7 +629,7 @@ maybe_parse_template(Key, Conf) -> parse_template(String) -> emqx_template:parse(String). -process_request_and_action(Request, ActionState, {Msg, Body}) -> +process_request_and_action(Request, ActionState, Msg) -> MethodTemplate = maps:get(method, ActionState), Method = make_method(render_template_string(MethodTemplate, Msg)), PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), @@ -647,17 +647,15 @@ process_request_and_action(Request, ActionState, {Msg, Body}) -> render_headers(HeadersTemplate1, Msg), render_headers(HeadersTemplate2, Msg) ), + BodyTemplate = maps:get(body, ActionState), + Body = render_request_body(BodyTemplate, Msg), #{ method => Method, path => Path, body => Body, headers => Headers, request_timeout => maps:get(request_timeout, ActionState) - }; -process_request_and_action(Request, ActionState, Msg) -> - BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), - process_request_and_action(Request, ActionState, {Msg, Body}). + }. merge_proplist(Proplist1, Proplist2) -> lists:foldl( @@ -877,7 +875,6 @@ redact_request({Path, Headers}) -> redact_request({Path, Headers, _Body}) -> {Path, Headers, <<"******">>}. -clientid({Msg, _Body}) -> clientid(Msg); clientid(Msg) -> maps:get(clientid, Msg, undefined). -ifdef(TEST). diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 822e8429b..655892d88 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -190,7 +190,7 @@ connector_structs() -> mk( hoconsc:map(name, ref(emqx_bridge_es_connector, config)), #{ - desc => <<"Elastis Search Connector Config">>, + desc => <<"ElasticSearch Connector Config">>, required => false } )} diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index 78299c4ee..62778a712 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -1,5 +1,10 @@ emqx_bridge_es { +elasticsearch.desc: +"""Elasticsearch Bridge""" +elasticsearch.label: +"""ElasticSearch""" + config_enable.desc: """Enable or disable this bridge""" @@ -74,15 +79,9 @@ desc_name.desc: desc_name.label: """Bridge Name""" -config_parameters_action.desc: -"""TODO""" - -config_parameters_action.label: -"""Action""" - config_parameters_index.desc: -"""Name of the data stream, index, or index alias to perform the action on. -This parameter is required if a is not specified in the request path.""" +"""Name of index, or index alias to perform the action on. +This parameter is required.""" config_parameters_index.label: """_index""" @@ -97,28 +96,10 @@ config_parameters_require_alias.desc: config_parameters_require_alias.label: """_require_alias""" -config_parameters_fields.desc: -"""The document source to index. Required for create and index operations.""" -config_parameters_fields.label: -"""fields""" - -config_parameters_doc_as_upsert.desc: -"""Instead of sending a partial doc plus an upsert doc, you can set doc_as_upsert to true -to use the contents of doc as the upsert value.""" -config_parameters_doc_as_upsert.label: -"""doc_as_upsert""" - -config_parameters_upsert.desc: -"""If the document does not already exist, the contents of the upsert element are inserted as a new document.""" -config_parameters_upsert.label: -"""upsert""" - - -action_parameters_data.desc: -"""ElasticSearch action parameter data""" - -action_parameters_data.label: -"""Parameter Data""" +config_parameters_doc.desc: +"""JSON document""" +config_parameters_doc.label: +"""doc""" action_parameters.desc: """ElasticSearch action parameters""" @@ -126,4 +107,38 @@ action_parameters.desc: action_parameters.label: """Parameters""" +config_overwrite.desc: +"""Set to false If a document with the specified _id already exists(conflict), the operation will fail.""" + +config_overwrite.label: +"""overwrite""" + +action_config.desc: +"""ElasticSearch Action Configuration""" +action_config.label: +"""ElasticSearch Action Config""" + +action_create.desc: +"""Adds a JSON document to the specified index and makes it searchable. +If the target is an index and the document already exists, +the request updates the document and increments its version.""" +action_create.label: +"""Create Doc""" + +action_delete.desc: +"""Removes a JSON document from the specified index.""" +action_delete.label: +"""Delete Doc""" + +action_update.desc: +"""Updates a document using the specified doc.""" +action_update.label: +"""Update Doc""" + +action_resource_opts.desc: +"""Resource options.""" + +action_resource_opts.label: +"""Resource Options""" + } diff --git a/rel/i18n/emqx_bridge_es_connector.hocon b/rel/i18n/emqx_bridge_es_connector.hocon index f980b3aca..ddd53e0fc 100644 --- a/rel/i18n/emqx_bridge_es_connector.hocon +++ b/rel/i18n/emqx_bridge_es_connector.hocon @@ -24,11 +24,6 @@ config_auth_basic_password.desc: config_auth_basic_password.label: """HTTP Basic Auth Password""" -config_base_url.desc: -"""The base URL of the external ElasticSearch service's REST interface.""" -config_base_url.label: -"""ElasticSearch REST Service Base URL""" - config_max_retries.desc: """HTTP request max retry times if failed.""" diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 401df33a6..1d98d82db 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -297,3 +297,5 @@ Syskeeper msacc now_us ns +elasticsearch +ElasticSearch