diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index ceba8e202..5ce60fe6c 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -87,7 +87,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_syskeeper_action_info, emqx_bridge_timescale_action_info, emqx_bridge_redis_action_info, - emqx_bridge_iotdb_action_info + emqx_bridge_iotdb_action_info, + emqx_bridge_es_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 74239ffc0..aa06b547d 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -361,11 +361,16 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> [] -> false; _ -> - {true, #{ - schema_module => Module, - type_name => TypeName, - missing_fields => MissingFields - }} + %% elasticsearch is new and doesn't have local_topic + case MissingFields of + [local_topic] when Module =:= emqx_bridge_es -> false; + _ -> + {true, #{ + schema_module => Module, + type_name => TypeName, + missing_fields => MissingFields + }} + end end. -endif. diff --git a/apps/emqx_bridge_es/BSL.txt b/apps/emqx_bridge_es/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_es/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_es/README.md b/apps/emqx_bridge_es/README.md new file mode 100644 index 000000000..b91af4aba --- /dev/null +++ b/apps/emqx_bridge_es/README.md @@ -0,0 +1,23 @@ +# Apache ElasticSearch Data Integration Bridge + +This application houses the ElasticSearch data integration bridge for EMQX Enterprise + Edition. It provides the means to connect to ElasticSearch and publish messages to it. + +It implements the connection management and interaction without need for a + separate connector app, since it's not used by authentication and authorization + applications. + + + +# Contributing +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_bridge_es/docker-ct b/apps/emqx_bridge_es/docker-ct new file mode 100644 index 000000000..80f0d394b --- /dev/null +++ b/apps/emqx_bridge_es/docker-ct @@ -0,0 +1 @@ +toxiproxy diff --git a/apps/emqx_bridge_es/etc/emqx_bridge_es.conf b/apps/emqx_bridge_es/etc/emqx_bridge_es.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_es/include/emqx_bridge_es.hrl b/apps/emqx_bridge_es/include/emqx_bridge_es.hrl new file mode 100644 index 000000000..8393cff2b --- /dev/null +++ b/apps/emqx_bridge_es/include/emqx_bridge_es.hrl @@ -0,0 +1,8 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_BRIDGE_ES_HRL). +-define(EMQX_BRIDGE_ES_HRL, true). + +-endif. diff --git a/apps/emqx_bridge_es/rebar.config b/apps/emqx_bridge_es/rebar.config new file mode 100644 index 000000000..2a3526e08 --- /dev/null +++ b/apps/emqx_bridge_es/rebar.config @@ -0,0 +1,15 @@ +%% -*- mode: erlang -*- + +{erl_opts, [ + debug_info +]}. + +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}}, + {emqx_bridge_http, {path, "../emqx_bridge_http"}} +]}. +{plugins, [rebar3_path_deps]}. +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src new file mode 100644 index 000000000..9e98cd33e --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src @@ -0,0 +1,23 @@ +%% -*- mode: erlang -*- +{application, emqx_bridge_es, [ + {description, "EMQX Enterprise Elastic Search Bridge"}, + {vsn, "0.1.0"}, + {modules, [ + emqx_bridge_es, + emqx_bridge_es_connector + ]}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_connector + ]}, + {env, []}, + {licenses, ["Business Source License 1.1"]}, + {maintainers, ["EMQX Team "]}, + {links, [ + {"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} +]}. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl new file mode 100644 index 000000000..b575f32ed --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -0,0 +1,227 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es). + +-include("emqx_bridge_es.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-export([bridge_v2_examples/1]). + +%% hocon_schema API +-export([namespace/0, roots/0, fields/1, desc/1]). + +-define(CONNECTOR_TYPE, elasticsearch). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + +namespace() -> "bridge_elasticsearch". + +roots() -> []. + +fields(action) -> + {elasticsearch, + ?HOCON( + ?MAP(action_name, ?R_REF(action_config)), + #{ + required => false, + desc => ?DESC(elasticsearch) + } + )}; +fields(action_config) -> + emqx_resource_schema:override( + emqx_bridge_v2_schema:make_consumer_action_schema( + ?HOCON( + ?UNION(fun action_union_member_selector/1), + #{ + required => true, desc => ?DESC("action_parameters") + } + ) + ), + [ + {resource_opts, + ?HOCON(?R_REF(action_resource_opts), #{ + default => #{}, + desc => ?DESC(emqx_resource_schema, "resource_opts") + })} + ] + ); +fields(action_resource_opts) -> + lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + emqx_bridge_v2_schema:resource_opts_fields() + ); +fields(action_create) -> + [ + action(create), + index(), + id(false), + doc(true), + routing(), + require_alias(), + overwrite() + | http_common_opts() + ]; +fields(action_delete) -> + [action(delete), index(), id(true), routing() | http_common_opts()]; +fields(action_update) -> + [ + action(update), + index(), + id(true), + doc(true), + routing(), + require_alias() + | http_common_opts() + ]; +fields("post_bridge_v2") -> + emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config); +fields("put_bridge_v2") -> + fields(action_config); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"). + +action_union_member_selector(all_union_members) -> + [ + ?R_REF(action_create), + ?R_REF(action_delete), + ?R_REF(action_update) + ]; +action_union_member_selector({value, Value}) -> + case Value of + #{<<"action">> := <<"create">>} -> + [?R_REF(action_create)]; + #{<<"action">> := <<"delete">>} -> + [?R_REF(action_delete)]; + #{<<"action">> := <<"update">>} -> + [?R_REF(action_update)]; + _ -> + Expected = "create | delete | update", + throw(#{ + field_name => action, + expected => Expected + }) + end. + +action(Action) -> + {action, + ?HOCON( + Action, + #{ + required => true, + desc => atom_to_binary(Action) + } + )}. + +overwrite() -> + {overwrite, + ?HOCON( + boolean(), + #{ + required => false, + default => true, + desc => ?DESC("config_overwrite") + } + )}. + +index() -> + {index, + ?HOCON( + binary(), + #{ + required => true, + example => <<"${payload.index}">>, + desc => ?DESC("config_parameters_index") + } + )}. + +id(Required) -> + {id, + ?HOCON( + binary(), + #{ + required => Required, + example => <<"${payload.id}">>, + desc => ?DESC("config_parameters_id") + } + )}. + +doc(Required) -> + {doc, + ?HOCON( + binary(), + #{ + required => Required, + example => <<"${payload.doc}">>, + desc => ?DESC("config_parameters_doc") + } + )}. + +http_common_opts() -> + lists:filter( + fun({K, _}) -> + not lists:member(K, [path, method, body, headers, request_timeout]) + end, + emqx_bridge_http_schema:fields("parameters_opts") + ). + +routing() -> + {routing, + ?HOCON( + binary(), + #{ + required => false, + example => <<"${payload.routing}">>, + desc => ?DESC("config_routing") + } + )}. + +require_alias() -> + {require_alias, + ?HOCON( + boolean(), + #{ + required => false, + desc => ?DESC("config_require_alias") + } + )}. + +bridge_v2_examples(Method) -> + [ + #{ + <<"elasticsearch">> => + #{ + summary => <<"Elastic Search Bridge">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + parameters => #{ + action => create, + index => <<"${payload.index}">>, + overwrite => true, + doc => <<"${payload.doc}">> + } + }. + +unsupported_opts() -> + [ + batch_size, + batch_time + ]. + +desc(elasticsearch) -> ?DESC(elasticsearch); +desc(action_config) -> ?DESC(action_config); +desc(action_create) -> ?DESC(action_create); +desc(action_delete) -> ?DESC(action_delete); +desc(action_update) -> ?DESC(action_update); +desc(action_resource_opts) -> ?DESC(action_resource_opts); +desc(_) -> undefined. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl new file mode 100644 index 000000000..b2f2ff777 --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es_action_info). + +-behaviour(emqx_action_info). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% behaviour callbacks +-export([ + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +-define(ACTION_TYPE, elasticsearch). + +action_type_name() -> ?ACTION_TYPE. +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> emqx_bridge_es. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl new file mode 100644 index 000000000..fe86eac56 --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -0,0 +1,373 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es_connector). + +-behaviour(emqx_resource). + +-include("emqx_bridge_es.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_get_status/2, + on_query/3, + on_query_async/4, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1, + connector_examples/1, + connector_example_values/0 +]). + +%% emqx_connector_resource behaviour callbacks +-export([connector_config/2]). + +-type config() :: + #{ + base_url := #{ + scheme := http | https, + host := iolist(), + port := inet:port_number(), + path := _ + }, + connect_timeout := pos_integer(), + pool_type := random | hash, + pool_size := pos_integer(), + request => undefined | map(), + atom() => _ + }. + +-type state() :: + #{ + base_path := _, + connect_timeout := pos_integer(), + pool_type := random | hash, + channels := map(), + request => undefined | map(), + atom() => _ + }. + +-type manager_id() :: binary(). + +-define(CONNECTOR_TYPE, elasticsearch). + +%%------------------------------------------------------------------------------------- +%% connector examples +%%------------------------------------------------------------------------------------- +connector_examples(Method) -> + [ + #{ + <<"elasticsearch">> => + #{ + summary => <<"Elastic Search Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"elasticsearch_connector">>, + type => elasticsearch, + enable => true, + authentication => #{ + <<"username">> => <<"root">>, + <<"password">> => <<"******">> + }, + base_url => <<"http://127.0.0.1:9200/">>, + connect_timeout => <<"15s">>, + pool_type => <<"random">>, + pool_size => 8, + enable_pipelining => 100, + ssl => #{enable => false} + }. + +%%------------------------------------------------------------------------------------- +%% schema +%%------------------------------------------------------------------------------------- +namespace() -> "elasticsearch". + +roots() -> + [{config, #{type => ?R_REF(config)}}]. + +fields(config) -> + lists:filter( + fun({K, _}) -> not lists:member(K, [url, request, retry_interval, headers]) end, + emqx_bridge_http_schema:fields("config_connector") + ) ++ + fields("connection_fields"); +fields("connection_fields") -> + [ + {base_url, + ?HOCON( + emqx_schema:url(), + #{ + required => true, + desc => ?DESC(emqx_bridge_es, "config_base_url") + } + )}, + {authentication, + ?HOCON( + ?UNION([?R_REF(auth_basic)]), + #{ + desc => ?DESC("config_authentication") + } + )} + ]; +fields(auth_basic) -> + [ + {username, + ?HOCON(binary(), #{ + required => true, + desc => ?DESC("config_auth_basic_username") + })}, + {password, + emqx_schema_secret:mk(#{ + required => true, + desc => ?DESC("config_auth_basic_password") + })} + ]; +fields("post") -> + emqx_connector_schema:type_and_name_fields(elasticsearch) ++ fields(config); +fields("put") -> + fields(config); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc(auth_basic) -> + "Basic Authentication"; +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Elastic Search using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) -> + #{ + base_url := BaseUrl, + authentication := + #{ + username := Username, + password := Password0 + } + } = Conf, + + Password = emqx_secret:unwrap(Password0), + Base64 = base64:encode(<>), + BasicToken = <<"Basic ", Base64/binary>>, + + WebhookConfig = + Conf#{ + method => <<"post">>, + url => BaseUrl, + headers => [ + {<<"Content-type">>, <<"application/json">>}, + {<<"Authorization">>, BasicToken} + ] + }, + ParseConfs( + <<"http">>, + Name, + WebhookConfig + ). + +%%------------------------------------------------------------------------------------- +%% `emqx_resource' API +%%------------------------------------------------------------------------------------- +callback_mode() -> async_if_possible. + +-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). +on_start(InstanceId, Config) -> + case emqx_bridge_http_connector:on_start(InstanceId, Config) of + {ok, State} -> + ?SLOG(info, #{ + msg => "elasticsearch_bridge_started", + instance_id => InstanceId, + request => maps:get(request, State, <<>>) + }), + ?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}), + {ok, State#{channels => #{}}}; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_start_elasticsearch_bridge", + instance_id => InstanceId, + request => maps:get(request, Config, <<>>), + reason => Reason + }), + throw(failed_to_start_elasticsearch_bridge) + end. + +-spec on_stop(manager_id(), state()) -> ok | {error, term()}. +on_stop(InstanceId, State) -> + ?SLOG(info, #{ + msg => "stopping_elasticsearch_bridge", + connector => InstanceId + }), + Res = emqx_bridge_http_connector:on_stop(InstanceId, State), + ?tp(elasticsearch_bridge_stopped, #{instance_id => InstanceId}), + Res. + +-spec on_get_status(manager_id(), state()) -> + {connected, state()} | {disconnected, state(), term()}. +on_get_status(InstanceId, State) -> + emqx_bridge_http_connector:on_get_status(InstanceId, State). + +-spec on_query(manager_id(), tuple(), state()) -> + {ok, pos_integer(), [term()], term()} + | {ok, pos_integer(), [term()]} + | {error, term()}. +on_query(InstanceId, {ChannelId, Msg} = Req, State) -> + ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}), + ?SLOG(debug, #{ + msg => "elasticsearch_bridge_on_query_called", + instance_id => InstanceId, + send_message => Req, + state => emqx_utils:redact(State) + }), + handle_response( + emqx_bridge_http_connector:on_query( + InstanceId, {ChannelId, Msg}, State + ) + ). + +-spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) -> + {ok, pid()} | {error, empty_request}. +on_query_async( + InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, State +) -> + ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}), + ?SLOG(debug, #{ + msg => "elasticsearch_bridge_on_query_async_called", + instance_id => InstanceId, + send_message => Req, + state => emqx_utils:redact(State) + }), + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_bridge_http_connector:on_query_async( + InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State + ). + +on_add_channel( + InstanceId, + #{channels := Channels} = State0, + ChannelId, + #{parameters := Parameter} +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + Parameter1 = Parameter#{ + path => path(Parameter), + method => method(Parameter), + body => get_body_template(Parameter) + }, + {ok, State} = emqx_bridge_http_connector:on_add_channel( + InstanceId, State0, ChannelId, #{parameters => Parameter1} + ), + Channel = Parameter1, + Channels2 = Channels#{ChannelId => Channel}, + {ok, State#{channels => Channels2}} + end. + +on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> + {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId), + Channels2 = maps:remove(ChannelId, Channels), + {ok, OldState#{channels => Channels2}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> + connected; + _ -> + {error, not_exists} + end. + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- +%% delete DELETE //_doc/<_id> +path(#{action := delete, id := Id, index := Index} = Action) -> + BasePath = ["/", Index, "/_doc/", Id], + Qs = add_query_string([routing], Action), + BasePath ++ Qs; +%% update POST //_update/<_id> +path(#{action := update, id := Id, index := Index} = Action) -> + BasePath = ["/", Index, "/_update/", Id], + Qs = add_query_string([routing, require_alias], Action), + BasePath ++ Qs; +%% create with id //_doc/_id +path(#{action := create, index := Index, id := Id} = Action) -> + BasePath = ["/", Index, "/_doc/", Id], + Qs = + case maps:get(overwrite, Action, true) of + true -> + add_query_string([routing, require_alias], Action); + false -> + Action1 = Action#{op_type => "create"}, + add_query_string([routing, require_alias, op_type], Action1) + end, + BasePath ++ Qs; +%% create without id POST //_doc/ +path(#{action := create, index := Index} = Action) -> + BasePath = ["/", Index, "/_doc/"], + Qs = add_query_string([routing, require_alias], Action), + BasePath ++ Qs. + +method(#{action := create}) -> <<"POST">>; +method(#{action := delete}) -> <<"DELETE">>; +method(#{action := update}) -> <<"POST">>. + +add_query_string(Keys, Param0) -> + Param1 = maps:with(Keys, Param0), + FoldFun = fun(K, V, Acc) -> [[atom_to_list(K), "=", to_str(V)] | Acc] end, + case maps:fold(FoldFun, [], Param1) of + "" -> ""; + QString -> "?" ++ lists:join("&", QString) + end. + +to_str(List) when is_list(List) -> List; +to_str(false) -> "false"; +to_str(true) -> "true"; +to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). + +handle_response({ok, Code, _Headers, _Body} = Resp) when Code =:= 200; Code =:= 201 -> + Resp; +handle_response({ok, Code, _Body} = Resp) when Code =:= 200; Code =:= 201 -> + Resp; +handle_response({ok, Code, _Headers, Body}) -> + {error, #{code => Code, body => Body}}; +handle_response({ok, Code, Body}) -> + {error, #{code => Code, body => Body}}; +handle_response({error, _} = Error) -> + Error. + +get_body_template(#{doc := Doc}) -> Doc; +get_body_template(_) -> undefined. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 51375fc04..8f54694e9 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -337,7 +337,7 @@ on_query( } = process_request_and_action(Request, ActionState, Msg), %% bridge buffer worker has retry, do not let ehttpc retry Retry = 2, - ClientId = maps:get(clientid, Msg, undefined), + ClientId = clientid(Msg), on_query( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, @@ -449,7 +449,7 @@ on_query_async( headers := Headers, request_timeout := Timeout } = process_request_and_action(Request, ActionState, Msg), - ClientId = maps:get(clientid, Msg, undefined), + ClientId = clientid(Msg), on_query_async( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout}, @@ -632,9 +632,6 @@ parse_template(String) -> process_request_and_action(Request, ActionState, Msg) -> MethodTemplate = maps:get(method, ActionState), Method = make_method(render_template_string(MethodTemplate, Msg)), - BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), - PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)), @@ -650,6 +647,8 @@ process_request_and_action(Request, ActionState, Msg) -> render_headers(HeadersTemplate1, Msg), render_headers(HeadersTemplate2, Msg) ), + BodyTemplate = maps:get(body, ActionState), + Body = render_request_body(BodyTemplate, Msg), #{ method => Method, path => Path, @@ -732,7 +731,7 @@ formalize_request(_Method, BasePath, {Path, Headers}) -> %% because an HTTP server may handle paths like %% "/a/b/c/", "/a/b/c" and "/a//b/c" differently. %% -%% So we try to avoid unneccessary path normalization. +%% So we try to avoid unnecessary path normalization. %% %% See also: `join_paths_test_/0` join_paths(Path1, Path2) -> @@ -876,6 +875,8 @@ redact_request({Path, Headers}) -> redact_request({Path, Headers, _Body}) -> {Path, Headers, <<"******">>}. +clientid(Msg) -> maps:get(clientid, Msg, undefined). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 4286a59e4..7fbcfc6db 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -90,7 +90,7 @@ connector_example_values() -> enable => true, authentication => #{ <<"username">> => <<"root">>, - <<"password">> => <<"*****">> + <<"password">> => <<"******">> }, base_url => <<"http://iotdb.local:18080/">>, connect_timeout => <<"15s">>, @@ -109,7 +109,10 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++ + proplists_without( + [url, request, retry_interval, headers], + emqx_bridge_http_schema:fields("config_connector") + ) ++ fields("connection_fields"); fields("connection_fields") -> [ @@ -206,7 +209,7 @@ on_start(InstanceId, Config) -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", instance_id => InstanceId, - base_url => maps:get(request, Config, <<>>), + request => maps:get(request, Config, <<>>), reason => Reason }), throw(failed_to_start_iotdb_bridge) diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 9e35015b0..655892d88 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -50,6 +50,8 @@ resource_type(redis) -> emqx_bridge_redis_connector; resource_type(iotdb) -> emqx_bridge_iotdb_connector; +resource_type(elasticsearch) -> + emqx_bridge_es_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -62,6 +64,8 @@ connector_impl_module(confluent_producer) -> emqx_bridge_confluent_producer; connector_impl_module(iotdb) -> emqx_bridge_iotdb_connector; +connector_impl_module(elasticsearch) -> + emqx_bridge_es_connector; connector_impl_module(_ConnectorType) -> undefined. @@ -181,6 +185,14 @@ connector_structs() -> desc => <<"IoTDB Connector Config">>, required => false } + )}, + {elasticsearch, + mk( + hoconsc:map(name, ref(emqx_bridge_es_connector, config)), + #{ + desc => <<"ElasticSearch Connector Config">>, + required => false + } )} ]. @@ -199,7 +211,8 @@ schema_modules() -> emqx_bridge_timescale, emqx_postgresql_connector_schema, emqx_bridge_redis_schema, - emqx_bridge_iotdb_connector + emqx_bridge_iotdb_connector, + emqx_bridge_es_connector ]. api_schemas(Method) -> @@ -227,7 +240,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), - api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method) + api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), + api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 0a3c9d744..b043ebacd 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -147,7 +147,9 @@ connector_type_to_bridge_types(syskeeper_proxy) -> connector_type_to_bridge_types(timescale) -> [timescale]; connector_type_to_bridge_types(iotdb) -> - [iotdb]. + [iotdb]; +connector_type_to_bridge_types(elasticsearch) -> + [elasticsearch]. actions_config_name() -> <<"actions">>. diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index cf3ad1523..f7e78c360 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -99,6 +99,7 @@ emqx_bridge_hstreamdb, emqx_bridge_influxdb, emqx_bridge_iotdb, + emqx_bridge_es, emqx_bridge_matrix, emqx_bridge_mongodb, emqx_bridge_mysql, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 6d7012313..320ddea02 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.2.17"}, + {vsn, "0.2.18"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/mix.exs b/mix.exs index 13d95ccb5..5ef6fef36 100644 --- a/mix.exs +++ b/mix.exs @@ -164,6 +164,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_hstreamdb, :emqx_bridge_influxdb, :emqx_bridge_iotdb, + :emqx_bridge_es, :emqx_bridge_matrix, :emqx_bridge_mongodb, :emqx_bridge_mysql, diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon new file mode 100644 index 000000000..62778a712 --- /dev/null +++ b/rel/i18n/emqx_bridge_es.hocon @@ -0,0 +1,144 @@ +emqx_bridge_es { + +elasticsearch.desc: +"""Elasticsearch Bridge""" +elasticsearch.label: +"""ElasticSearch""" + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the IoTDB REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the IoTDB REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_base_url.desc: +"""The base URL of the external ElasticSearch service's REST interface.""" +config_base_url.label: +"""ElasticSearch REST Service Base URL""" + +config_target.desc: +"""Name of the data stream, index, or index alias to perform bulk actions on""" + +config_target.label: +"""Target""" + +config_require_alias.desc: +"""If true, the request’s actions must target an index alias. Defaults to false""" +config_require_alias.label: +"""Require Alias""" + +config_routing.desc: +"""Custom value used to route operations to a specific shard.""" +config_routing.label: +"""Routing""" + +config_wait_for_active_shards.desc: +"""The number of shard copies that must be active before proceeding with the operation. +Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1). +Default: 1, the primary shard""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +desc_config.desc: +"""Configuration for Apache IoTDB bridge.""" + +desc_config.label: +"""IoTDB Bridge Configuration""" + +desc_name.desc: +"""Bridge name, used as a human-readable description of the bridge.""" + +desc_name.label: +"""Bridge Name""" + +config_parameters_index.desc: +"""Name of index, or index alias to perform the action on. +This parameter is required.""" + +config_parameters_index.label: +"""_index""" + +config_parameters_id.desc: +"""The document ID. If no ID is specified, a document ID is automatically generated.""" +config_parameters_id.label: +"""_id""" + +config_parameters_require_alias.desc: +"""If true, the action must target an index alias. Defaults to false.""" +config_parameters_require_alias.label: +"""_require_alias""" + +config_parameters_doc.desc: +"""JSON document""" +config_parameters_doc.label: +"""doc""" + +action_parameters.desc: +"""ElasticSearch action parameters""" + +action_parameters.label: +"""Parameters""" + +config_overwrite.desc: +"""Set to false If a document with the specified _id already exists(conflict), the operation will fail.""" + +config_overwrite.label: +"""overwrite""" + +action_config.desc: +"""ElasticSearch Action Configuration""" +action_config.label: +"""ElasticSearch Action Config""" + +action_create.desc: +"""Adds a JSON document to the specified index and makes it searchable. +If the target is an index and the document already exists, +the request updates the document and increments its version.""" +action_create.label: +"""Create Doc""" + +action_delete.desc: +"""Removes a JSON document from the specified index.""" +action_delete.label: +"""Delete Doc""" + +action_update.desc: +"""Updates a document using the specified doc.""" +action_update.label: +"""Update Doc""" + +action_resource_opts.desc: +"""Resource options.""" + +action_resource_opts.label: +"""Resource Options""" + +} diff --git a/rel/i18n/emqx_bridge_es_connector.hocon b/rel/i18n/emqx_bridge_es_connector.hocon new file mode 100644 index 000000000..ddd53e0fc --- /dev/null +++ b/rel/i18n/emqx_bridge_es_connector.hocon @@ -0,0 +1,39 @@ +emqx_bridge_es_connector { + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the ElasticSearch REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the ElasticSearch REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +desc_config.desc: +"""Configuration for ElasticSearch bridge.""" + +desc_config.label: +"""ElasticSearch Bridge Configuration""" + +} diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 401df33a6..1d98d82db 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -297,3 +297,5 @@ Syskeeper msacc now_us ns +elasticsearch +ElasticSearch