diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index ceba8e202..5ce60fe6c 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -87,7 +87,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_syskeeper_action_info, emqx_bridge_timescale_action_info, emqx_bridge_redis_action_info, - emqx_bridge_iotdb_action_info + emqx_bridge_iotdb_action_info, + emqx_bridge_es_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_es/.gitignore b/apps/emqx_bridge_es/.gitignore new file mode 100644 index 000000000..e9bc1c544 --- /dev/null +++ b/apps/emqx_bridge_es/.gitignore @@ -0,0 +1,19 @@ +.rebar3 + _* + .eunit + *.o + *.beam + *.plt + *.swp + *.swo + .erlang.cookie + ebin + log + erl_crash.dump + .rebar + logs + _build + .idea + *.iml + rebar3.crashdump + *~ diff --git a/apps/emqx_bridge_es/BSL.txt b/apps/emqx_bridge_es/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_es/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_es/README.md b/apps/emqx_bridge_es/README.md new file mode 100644 index 000000000..b91af4aba --- /dev/null +++ b/apps/emqx_bridge_es/README.md @@ -0,0 +1,23 @@ +# Apache ElasticSearch Data Integration Bridge + +This application houses the ElasticSearch data integration bridge for EMQX Enterprise + Edition. It provides the means to connect to ElasticSearch and publish messages to it. + +It implements the connection management and interaction without need for a + separate connector app, since it's not used by authentication and authorization + applications. + + + +# Contributing +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_bridge_es/docker-ct b/apps/emqx_bridge_es/docker-ct new file mode 100644 index 000000000..80f0d394b --- /dev/null +++ b/apps/emqx_bridge_es/docker-ct @@ -0,0 +1 @@ +toxiproxy diff --git a/apps/emqx_bridge_es/etc/emqx_bridge_es.conf b/apps/emqx_bridge_es/etc/emqx_bridge_es.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_es/include/emqx_bridge_es.hrl b/apps/emqx_bridge_es/include/emqx_bridge_es.hrl new file mode 100644 index 000000000..8393cff2b --- /dev/null +++ b/apps/emqx_bridge_es/include/emqx_bridge_es.hrl @@ -0,0 +1,8 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_BRIDGE_ES_HRL). +-define(EMQX_BRIDGE_ES_HRL, true). + +-endif. diff --git a/apps/emqx_bridge_es/rebar.config b/apps/emqx_bridge_es/rebar.config new file mode 100644 index 000000000..2a3526e08 --- /dev/null +++ b/apps/emqx_bridge_es/rebar.config @@ -0,0 +1,15 @@ +%% -*- mode: erlang -*- + +{erl_opts, [ + debug_info +]}. + +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}}, + {emqx_bridge_http, {path, "../emqx_bridge_http"}} +]}. +{plugins, [rebar3_path_deps]}. +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src new file mode 100644 index 000000000..9e98cd33e --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src @@ -0,0 +1,23 @@ +%% -*- mode: erlang -*- +{application, emqx_bridge_es, [ + {description, "EMQX Enterprise Elastic Search Bridge"}, + {vsn, "0.1.0"}, + {modules, [ + emqx_bridge_es, + emqx_bridge_es_connector + ]}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_connector + ]}, + {env, []}, + {licenses, ["Business Source License 1.1"]}, + {maintainers, ["EMQX Team "]}, + {links, [ + {"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} +]}. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl new file mode 100644 index 000000000..57ab648b5 --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -0,0 +1,312 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es). + +-include("emqx_bridge_es.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-export([bridge_v2_examples/1]). + +%% hocon_schema API +-export([namespace/0, roots/0, fields/1, desc/1]). + +-define(CONNECTOR_TYPE, elasticsearch). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + +namespace() -> "bridge_elasticsearch". + +roots() -> []. + +fields(action) -> + {elasticsearch, + ?HOCON( + ?MAP(action_name, ?R_REF(action_config)), + #{ + desc => <<"ElasticSearch Action Config">>, + required => false + } + )}; +fields(action_config) -> + emqx_resource_schema:override( + emqx_bridge_v2_schema:make_producer_action_schema( + ?HOCON( + ?R_REF(action_parameters), + #{ + required => true, desc => ?DESC("action_parameters") + } + ) + ), + [ + {resource_opts, + ?HOCON(?R_REF(action_resource_opts), #{ + default => #{}, + desc => ?DESC(emqx_resource_schema, "resource_opts") + })} + ] + ); +fields(action_resource_opts) -> + lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + emqx_bridge_v2_schema:resource_opts_fields() + ); +fields(action_parameters) -> + [ + {target, + ?HOCON( + binary(), + #{ + desc => ?DESC("config_target"), + required => false + } + )}, + {require_alias, + ?HOCON( + boolean(), + #{ + required => false, + default => false, + desc => ?DESC("config_require_alias") + } + )}, + {routing, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_routing") + } + )}, + {wait_for_active_shards, + ?HOCON( + ?UNION([pos_integer(), all]), + #{ + required => false, + desc => ?DESC("config_wait_for_active_shards") + } + )}, + {data, + ?HOCON( + ?ARRAY( + ?UNION( + [ + ?R_REF(create), + ?R_REF(delete), + ?R_REF(index), + ?R_REF(update) + ] + ) + ), + #{ + desc => ?DESC("action_parameters_data") + } + )} + ] ++ + lists:filter( + fun({K, _}) -> + not lists:member(K, [path, method, body, headers, request_timeout]) + end, + emqx_bridge_http_schema:fields("parameters_opts") + ); +fields(Action) when Action =:= create; Action =:= index -> + [ + {action, + ?HOCON( + Action, + #{ + desc => atom_to_binary(Action), + required => true + } + )}, + {'_index', + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_index") + } + )}, + {'_id', + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_id") + } + )}, + {require_alias, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_require_alias") + } + )}, + {fields, + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_fields") + } + )} + ]; +fields(delete) -> + [ + {action, + ?HOCON( + delete, + #{ + desc => <<"Delete">>, + required => true + } + )}, + {'_index', + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_index") + } + )}, + {'_id', + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_id") + } + )}, + {require_alias, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_require_alias") + } + )} + ]; +fields(update) -> + [ + {action, + ?HOCON( + update, + #{ + desc => <<"Update">>, + required => true + } + )}, + {doc_as_upsert, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_doc_as_upsert") + } + )}, + {upsert, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_upsert") + } + )}, + {'_index', + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_index") + } + )}, + {'_id', + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_id") + } + )}, + {require_alias, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("config_parameters_require_alias") + } + )}, + {fields, + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_fields") + } + )} + ]; +fields("post_bridge_v2") -> + emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config); +fields("put_bridge_v2") -> + fields(action_config); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"). + +bridge_v2_examples(Method) -> + [ + #{ + <<"elasticsearch">> => + #{ + summary => <<"Elastic Search Bridge">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + parameters => #{ + target => <<"${target_index}">>, + data => [ + #{ + action => index, + '_index' => <<"${index}">>, + fields => <<"${fields}">>, + require_alias => <<"${require_alias}">> + }, + #{ + action => create, + '_index' => <<"${index}">>, + fields => <<"${fields}">> + }, + #{ + action => delete, + '_index' => <<"${index}">>, + '_id' => <<"${id}">> + }, + #{ + action => update, + '_index' => <<"${index}">>, + '_id' => <<"${id}">>, + fields => <<"${fields}">>, + require_alias => false, + doc_as_upsert => <<"${doc_as_upsert}">>, + upsert => <<"${upsert}">> + } + ] + } + }. + +unsupported_opts() -> + [ + batch_size, + batch_time + ]. + +desc(_) -> undefined. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl new file mode 100644 index 000000000..b2f2ff777 --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es_action_info). + +-behaviour(emqx_action_info). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% behaviour callbacks +-export([ + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +-define(ACTION_TYPE, elasticsearch). + +action_type_name() -> ?ACTION_TYPE. +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> emqx_bridge_es. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl new file mode 100644 index 000000000..22509e037 --- /dev/null +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -0,0 +1,498 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_es_connector). + +-behaviour(emqx_resource). + +-include("emqx_bridge_es.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_get_status/2, + on_query/3, + on_query_async/4, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1, + connector_examples/1, + connector_example_values/0 +]). + +%% emqx_connector_resource behaviour callbacks +-export([connector_config/2]). + +-type config() :: + #{ + base_url := #{ + scheme := http | https, + host := iolist(), + port := inet:port_number(), + path := _ + }, + connect_timeout := pos_integer(), + pool_type := random | hash, + pool_size := pos_integer(), + request => undefined | map(), + atom() => _ + }. + +-type state() :: + #{ + base_path := _, + connect_timeout := pos_integer(), + pool_type := random | hash, + channels := map(), + request => undefined | map(), + atom() => _ + }. + +-type manager_id() :: binary(). + +-define(CONNECTOR_TYPE, elasticsearch). + +%%------------------------------------------------------------------------------------- +%% connector examples +%%------------------------------------------------------------------------------------- +connector_examples(Method) -> + [ + #{ + <<"elasticsearch">> => + #{ + summary => <<"Elastic Search Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"elasticsearch_connector">>, + type => elasticsearch, + enable => true, + authentication => #{ + <<"username">> => <<"root">>, + <<"password">> => <<"******">> + }, + base_url => <<"http://127.0.0.1:9200/">>, + connect_timeout => <<"15s">>, + pool_type => <<"random">>, + pool_size => 8, + enable_pipelining => 100, + ssl => #{enable => false} + }. + +%%------------------------------------------------------------------------------------- +%% schema +%%------------------------------------------------------------------------------------- +namespace() -> "elasticsearch". + +roots() -> + [{config, #{type => ?R_REF(config)}}]. + +fields(config) -> + lists:filter( + fun({K, _}) -> not lists:member(K, [url, request, retry_interval, headers]) end, + emqx_bridge_http_schema:fields("config_connector") + ) ++ + fields("connection_fields"); +fields("connection_fields") -> + [ + {base_url, + ?HOCON( + emqx_schema:url(), + #{ + required => true, + desc => ?DESC(emqx_bridge_es, "config_base_url") + } + )}, + {authentication, + ?HOCON( + ?UNION([?R_REF(auth_basic)]), + #{ + desc => ?DESC("config_authentication") + } + )} + ]; +fields(auth_basic) -> + [ + {username, + ?HOCON(binary(), #{ + required => true, + desc => ?DESC("config_auth_basic_username") + })}, + {password, + emqx_schema_secret:mk(#{ + required => true, + desc => ?DESC("config_auth_basic_password") + })} + ]; +fields("post") -> + emqx_connector_schema:type_and_name_fields(elasticsearch) ++ fields(config); +fields("put") -> + fields(config); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc(auth_basic) -> + "Basic Authentication"; +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Elastic Search using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) -> + #{ + base_url := BaseUrl, + authentication := + #{ + username := Username, + password := Password0 + } + } = Conf, + + Password = emqx_secret:unwrap(Password0), + Base64 = base64:encode(<>), + BasicToken = <<"Basic ", Base64/binary>>, + + WebhookConfig = + Conf#{ + method => <<"post">>, + url => BaseUrl, + headers => [ + {<<"Content-type">>, <<"application/json">>}, + {<<"Authorization">>, BasicToken} + ] + }, + ParseConfs( + <<"http">>, + Name, + WebhookConfig + ). + +%%------------------------------------------------------------------------------------- +%% `emqx_resource' API +%%------------------------------------------------------------------------------------- +callback_mode() -> async_if_possible. + +-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). +on_start(InstanceId, Config) -> + case emqx_bridge_http_connector:on_start(InstanceId, Config) of + {ok, State} -> + ?SLOG(info, #{ + msg => "elasticsearch_bridge_started", + instance_id => InstanceId, + request => maps:get(request, State, <<>>) + }), + ?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}), + {ok, State#{channels => #{}}}; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_start_elasticsearch_bridge", + instance_id => InstanceId, + request => maps:get(request, Config, <<>>), + reason => Reason + }), + throw(failed_to_start_elasticsearch_bridge) + end. + +-spec on_stop(manager_id(), state()) -> ok | {error, term()}. +on_stop(InstanceId, State) -> + ?SLOG(info, #{ + msg => "stopping_elasticsearch_bridge", + connector => InstanceId + }), + Res = emqx_bridge_http_connector:on_stop(InstanceId, State), + ?tp(elasticsearch_bridge_stopped, #{instance_id => InstanceId}), + Res. + +-spec on_get_status(manager_id(), state()) -> + {connected, state()} | {disconnected, state(), term()}. +on_get_status(InstanceId, State) -> + emqx_bridge_http_connector:on_get_status(InstanceId, State). + +-spec on_query(manager_id(), tuple(), state()) -> + {ok, pos_integer(), [term()], term()} + | {ok, pos_integer(), [term()]} + | {error, term()}. +on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) -> + ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}), + ?SLOG(debug, #{ + msg => "elasticsearch_bridge_on_query_called", + instance_id => InstanceId, + send_message => Req, + state => emqx_utils:redact(State) + }), + case try_render_message(Req, Channels) of + {ok, Body} -> + handle_response( + emqx_bridge_http_connector:on_query( + InstanceId, {ChannelId, {Msg, Body}}, State + ) + ); + Error -> + Error + end. + +-spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) -> + {ok, pid()} | {error, empty_request}. +on_query_async( + InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, #{channels := Channels} = State +) -> + ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}), + ?SLOG(debug, #{ + msg => "elasticsearch_bridge_on_query_async_called", + instance_id => InstanceId, + send_message => Req, + state => emqx_utils:redact(State) + }), + case try_render_message(Req, Channels) of + {ok, Payload} -> + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_bridge_http_connector:on_query_async( + InstanceId, {ChannelId, {Msg, Payload}}, ReplyFunAndArgs, State + ); + Error -> + Error + end. + +on_add_channel( + InstanceId, + #{channels := Channels} = State0, + ChannelId, + #{parameters := Parameter} +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + #{data := Data} = Parameter, + Parameter1 = Parameter#{path => path(Parameter), method => <<"post">>}, + {ok, State} = emqx_bridge_http_connector:on_add_channel( + InstanceId, State0, ChannelId, #{parameters => Parameter1} + ), + case preproc_data_template(Data) of + [] -> + {error, invalid_data}; + DataTemplate -> + Channel = Parameter1#{data => DataTemplate}, + Channels2 = Channels#{ChannelId => Channel}, + {ok, State#{channels => Channels2}} + end + end. + +on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> + {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId), + Channels2 = maps:remove(ChannelId, Channels), + {ok, OldState#{channels => Channels2}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> + connected; + _ -> + {error, not_exists} + end. + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- +path(Param) -> + Target = maps:get(target, Param, undefined), + QString0 = maps:fold( + fun(K, V, Acc) -> + [[atom_to_list(K), "=", to_str(V)] | Acc] + end, + [["_source=false"], ["filter_path=items.*.error"]], + maps:with([require_alias, routing, wait_for_active_shards], Param) + ), + QString = "?" ++ lists:join("&", QString0), + target(Target) ++ QString. + +target(undefined) -> "/_bulk"; +target(Str) -> "/" ++ binary_to_list(Str) ++ "/_bulk". + +to_str(List) when is_list(List) -> List; +to_str(false) -> "false"; +to_str(true) -> "true"; +to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). + +proc_data(DataList, Msg) when is_list(DataList) -> + [ + begin + proc_data(Data, Msg) + end + || Data <- DataList + ]; +proc_data( + #{ + action := Action, + '_index' := IndexT, + '_id' := IdT, + require_alias := RequiredAliasT, + fields := FieldsT + }, + Msg +) when Action =:= create; Action =:= index -> + [ + emqx_utils_json:encode( + #{ + Action => filter([ + {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, + {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, + {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} + ]) + } + ), + "\n", + emqx_placeholder:proc_tmpl(FieldsT, Msg), + "\n" + ]; +proc_data( + #{ + action := delete, + '_index' := IndexT, + '_id' := IdT, + require_alias := RequiredAliasT + }, + Msg +) -> + [ + emqx_utils_json:encode( + #{ + delete => filter([ + {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, + {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, + {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} + ]) + } + ), + "\n" + ]; +proc_data( + #{ + action := update, + '_index' := IndexT, + '_id' := IdT, + require_alias := RequiredAliasT, + doc_as_upsert := DocAsUpsert, + upsert := Upsert, + fields := FieldsT + }, + Msg +) -> + [ + emqx_utils_json:encode( + #{ + update => filter([ + {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, + {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, + {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}, + {doc_as_upsert, emqx_placeholder:proc_tmpl(DocAsUpsert, Msg)}, + {upsert, emqx_placeholder:proc_tmpl(Upsert, Msg)} + ]) + } + ), + "\n{\"doc\":", + emqx_placeholder:proc_tmpl(FieldsT, Msg), + "}\n" + ]. + +filter(List) -> + Fun = fun + ({_K, V}) when V =:= undefined; V =:= <<"undefined">>; V =:= "undefined" -> + false; + ({_K, V}) when V =:= ""; V =:= <<>> -> + false; + ({_K, V}) when V =:= "false" -> {true, false}; + ({_K, V}) when V =:= "true" -> {true, true}; + ({_K, _V}) -> + true + end, + maps:from_list(lists:filtermap(Fun, List)). + +handle_response({ok, 200, _Headers, Body} = Resp) -> + eval_response_body(Body, Resp); +handle_response({ok, 200, Body} = Resp) -> + eval_response_body(Body, Resp); +handle_response({ok, Code, _Headers, Body}) -> + {error, #{code => Code, body => Body}}; +handle_response({ok, Code, Body}) -> + {error, #{code => Code, body => Body}}; +handle_response({error, _} = Error) -> + Error. + +eval_response_body(<<"{}">>, Resp) -> Resp; +eval_response_body(Body, _Resp) -> {error, emqx_utils_json:decode(Body)}. + +preproc_data_template(DataList) when is_list(DataList) -> + [ + begin + preproc_data_template(Data) + end + || Data <- DataList + ]; +preproc_data_template(#{action := create} = Data) -> + Index = maps:get('_index', Data, ""), + Id = maps:get('_id', Data, ""), + RequiredAlias = maps:get(require_alias, Data, ""), + Fields = maps:get(fields, Data, ""), + #{ + action => create, + '_index' => emqx_placeholder:preproc_tmpl(Index), + '_id' => emqx_placeholder:preproc_tmpl(Id), + require_alias => emqx_placeholder:preproc_tmpl(RequiredAlias), + fields => emqx_placeholder:preproc_tmpl(Fields) + }; +preproc_data_template(#{action := index} = Data) -> + Data1 = preproc_data_template(Data#{action => create}), + Data1#{action => index}; +preproc_data_template(#{action := delete} = Data) -> + Data1 = preproc_data_template(Data#{action => create}), + Data2 = Data1#{action => delete}, + maps:remove(fields, Data2); +preproc_data_template(#{action := update} = Data) -> + Data1 = preproc_data_template(Data#{action => index}), + DocAsUpsert = maps:get(doc_as_upsert, Data, ""), + Upsert = maps:get(upsert, Data, ""), + Data1#{ + action => update, + doc_as_upsert => emqx_placeholder:preproc_tmpl(DocAsUpsert), + upsert => emqx_placeholder:preproc_tmpl(Upsert) + }. + +try_render_message({ChannelId, Msg}, Channels) -> + case maps:find(ChannelId, Channels) of + {ok, #{data := Data}} -> + {ok, proc_data(Data, Msg)}; + _ -> + {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} + end. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 51375fc04..f00ae8523 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -317,7 +317,7 @@ on_query(InstId, {send_message, Msg}, State) -> %% BridgeV2 entrypoint on_query( InstId, - {ActionId, Msg}, + {ActionId, MsgAndBody}, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of @@ -334,10 +334,10 @@ on_query( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, Msg), + } = process_request_and_action(Request, ActionState, MsgAndBody), %% bridge buffer worker has retry, do not let ehttpc retry Retry = 2, - ClientId = maps:get(clientid, Msg, undefined), + ClientId = clientid(MsgAndBody), on_query( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, @@ -430,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> %% BridgeV2 entrypoint on_query_async( InstId, - {ActionId, Msg}, + {ActionId, MsgAndBody}, ReplyFunAndArgs, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> @@ -448,8 +448,8 @@ on_query_async( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, Msg), - ClientId = maps:get(clientid, Msg, undefined), + } = process_request_and_action(Request, ActionState, MsgAndBody), + ClientId = clientid(MsgAndBody), on_query_async( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout}, @@ -629,12 +629,9 @@ maybe_parse_template(Key, Conf) -> parse_template(String) -> emqx_template:parse(String). -process_request_and_action(Request, ActionState, Msg) -> +process_request_and_action(Request, ActionState, {Msg, Body}) -> MethodTemplate = maps:get(method, ActionState), Method = make_method(render_template_string(MethodTemplate, Msg)), - BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), - PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)), @@ -656,7 +653,11 @@ process_request_and_action(Request, ActionState, Msg) -> body => Body, headers => Headers, request_timeout => maps:get(request_timeout, ActionState) - }. + }; +process_request_and_action(Request, ActionState, Msg) -> + BodyTemplate = maps:get(body, ActionState), + Body = render_request_body(BodyTemplate, Msg), + process_request_and_action(Request, ActionState, {Msg, Body}). merge_proplist(Proplist1, Proplist2) -> lists:foldl( @@ -732,7 +733,7 @@ formalize_request(_Method, BasePath, {Path, Headers}) -> %% because an HTTP server may handle paths like %% "/a/b/c/", "/a/b/c" and "/a//b/c" differently. %% -%% So we try to avoid unneccessary path normalization. +%% So we try to avoid unnecessary path normalization. %% %% See also: `join_paths_test_/0` join_paths(Path1, Path2) -> @@ -876,6 +877,9 @@ redact_request({Path, Headers}) -> redact_request({Path, Headers, _Body}) -> {Path, Headers, <<"******">>}. +clientid({Msg, _Body}) -> clientid(Msg); +clientid(Msg) -> maps:get(clientid, Msg, undefined). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 4286a59e4..7fbcfc6db 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -90,7 +90,7 @@ connector_example_values() -> enable => true, authentication => #{ <<"username">> => <<"root">>, - <<"password">> => <<"*****">> + <<"password">> => <<"******">> }, base_url => <<"http://iotdb.local:18080/">>, connect_timeout => <<"15s">>, @@ -109,7 +109,10 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++ + proplists_without( + [url, request, retry_interval, headers], + emqx_bridge_http_schema:fields("config_connector") + ) ++ fields("connection_fields"); fields("connection_fields") -> [ @@ -206,7 +209,7 @@ on_start(InstanceId, Config) -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", instance_id => InstanceId, - base_url => maps:get(request, Config, <<>>), + request => maps:get(request, Config, <<>>), reason => Reason }), throw(failed_to_start_iotdb_bridge) diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 9e35015b0..822e8429b 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -50,6 +50,8 @@ resource_type(redis) -> emqx_bridge_redis_connector; resource_type(iotdb) -> emqx_bridge_iotdb_connector; +resource_type(elasticsearch) -> + emqx_bridge_es_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -62,6 +64,8 @@ connector_impl_module(confluent_producer) -> emqx_bridge_confluent_producer; connector_impl_module(iotdb) -> emqx_bridge_iotdb_connector; +connector_impl_module(elasticsearch) -> + emqx_bridge_es_connector; connector_impl_module(_ConnectorType) -> undefined. @@ -181,6 +185,14 @@ connector_structs() -> desc => <<"IoTDB Connector Config">>, required => false } + )}, + {elasticsearch, + mk( + hoconsc:map(name, ref(emqx_bridge_es_connector, config)), + #{ + desc => <<"Elastis Search Connector Config">>, + required => false + } )} ]. @@ -199,7 +211,8 @@ schema_modules() -> emqx_bridge_timescale, emqx_postgresql_connector_schema, emqx_bridge_redis_schema, - emqx_bridge_iotdb_connector + emqx_bridge_iotdb_connector, + emqx_bridge_es_connector ]. api_schemas(Method) -> @@ -227,7 +240,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), - api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method) + api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), + api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 0a3c9d744..b043ebacd 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -147,7 +147,9 @@ connector_type_to_bridge_types(syskeeper_proxy) -> connector_type_to_bridge_types(timescale) -> [timescale]; connector_type_to_bridge_types(iotdb) -> - [iotdb]. + [iotdb]; +connector_type_to_bridge_types(elasticsearch) -> + [elasticsearch]. actions_config_name() -> <<"actions">>. diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index cf3ad1523..f7e78c360 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -99,6 +99,7 @@ emqx_bridge_hstreamdb, emqx_bridge_influxdb, emqx_bridge_iotdb, + emqx_bridge_es, emqx_bridge_matrix, emqx_bridge_mongodb, emqx_bridge_mysql, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 6d7012313..320ddea02 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.2.17"}, + {vsn, "0.2.18"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/mix.exs b/mix.exs index de8195cfb..66889cebf 100644 --- a/mix.exs +++ b/mix.exs @@ -164,6 +164,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_hstreamdb, :emqx_bridge_influxdb, :emqx_bridge_iotdb, + :emqx_bridge_es, :emqx_bridge_matrix, :emqx_bridge_mongodb, :emqx_bridge_mysql, diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon new file mode 100644 index 000000000..78299c4ee --- /dev/null +++ b/rel/i18n/emqx_bridge_es.hocon @@ -0,0 +1,129 @@ +emqx_bridge_es { + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the IoTDB REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the IoTDB REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_base_url.desc: +"""The base URL of the external ElasticSearch service's REST interface.""" +config_base_url.label: +"""ElasticSearch REST Service Base URL""" + +config_target.desc: +"""Name of the data stream, index, or index alias to perform bulk actions on""" + +config_target.label: +"""Target""" + +config_require_alias.desc: +"""If true, the request’s actions must target an index alias. Defaults to false""" +config_require_alias.label: +"""Require Alias""" + +config_routing.desc: +"""Custom value used to route operations to a specific shard.""" +config_routing.label: +"""Routing""" + +config_wait_for_active_shards.desc: +"""The number of shard copies that must be active before proceeding with the operation. +Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1). +Default: 1, the primary shard""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +desc_config.desc: +"""Configuration for Apache IoTDB bridge.""" + +desc_config.label: +"""IoTDB Bridge Configuration""" + +desc_name.desc: +"""Bridge name, used as a human-readable description of the bridge.""" + +desc_name.label: +"""Bridge Name""" + +config_parameters_action.desc: +"""TODO""" + +config_parameters_action.label: +"""Action""" + +config_parameters_index.desc: +"""Name of the data stream, index, or index alias to perform the action on. +This parameter is required if a is not specified in the request path.""" + +config_parameters_index.label: +"""_index""" + +config_parameters_id.desc: +"""The document ID. If no ID is specified, a document ID is automatically generated.""" +config_parameters_id.label: +"""_id""" + +config_parameters_require_alias.desc: +"""If true, the action must target an index alias. Defaults to false.""" +config_parameters_require_alias.label: +"""_require_alias""" + +config_parameters_fields.desc: +"""The document source to index. Required for create and index operations.""" +config_parameters_fields.label: +"""fields""" + +config_parameters_doc_as_upsert.desc: +"""Instead of sending a partial doc plus an upsert doc, you can set doc_as_upsert to true +to use the contents of doc as the upsert value.""" +config_parameters_doc_as_upsert.label: +"""doc_as_upsert""" + +config_parameters_upsert.desc: +"""If the document does not already exist, the contents of the upsert element are inserted as a new document.""" +config_parameters_upsert.label: +"""upsert""" + + +action_parameters_data.desc: +"""ElasticSearch action parameter data""" + +action_parameters_data.label: +"""Parameter Data""" + +action_parameters.desc: +"""ElasticSearch action parameters""" + +action_parameters.label: +"""Parameters""" + +} diff --git a/rel/i18n/emqx_bridge_es_connector.hocon b/rel/i18n/emqx_bridge_es_connector.hocon new file mode 100644 index 000000000..f980b3aca --- /dev/null +++ b/rel/i18n/emqx_bridge_es_connector.hocon @@ -0,0 +1,44 @@ +emqx_bridge_es_connector { + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the ElasticSearch REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the ElasticSearch REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_base_url.desc: +"""The base URL of the external ElasticSearch service's REST interface.""" +config_base_url.label: +"""ElasticSearch REST Service Base URL""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +desc_config.desc: +"""Configuration for ElasticSearch bridge.""" + +desc_config.label: +"""ElasticSearch Bridge Configuration""" + +}