diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 74239ffc0..aa06b547d 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -361,11 +361,16 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> [] -> false; _ -> - {true, #{ - schema_module => Module, - type_name => TypeName, - missing_fields => MissingFields - }} + %% elasticsearch is new and doesn't have local_topic + case MissingFields of + [local_topic] when Module =:= emqx_bridge_es -> false; + _ -> + {true, #{ + schema_module => Module, + type_name => TypeName, + missing_fields => MissingFields + }} + end end. -endif. diff --git a/apps/emqx_bridge_es/.gitignore b/apps/emqx_bridge_es/.gitignore deleted file mode 100644 index e9bc1c544..000000000 --- a/apps/emqx_bridge_es/.gitignore +++ /dev/null @@ -1,19 +0,0 @@ -.rebar3 - _* - .eunit - *.o - *.beam - *.plt - *.swp - *.swo - .erlang.cookie - ebin - log - erl_crash.dump - .rebar - logs - _build - .idea - *.iml - rebar3.crashdump - *~ diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index 57ab648b5..b575f32ed 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -25,15 +25,15 @@ fields(action) -> ?HOCON( ?MAP(action_name, ?R_REF(action_config)), #{ - desc => <<"ElasticSearch Action Config">>, - required => false + required => false, + desc => ?DESC(elasticsearch) } )}; fields(action_config) -> emqx_resource_schema:override( - emqx_bridge_v2_schema:make_producer_action_schema( + emqx_bridge_v2_schema:make_consumer_action_schema( ?HOCON( - ?R_REF(action_parameters), + ?UNION(fun action_union_member_selector/1), #{ required => true, desc => ?DESC("action_parameters") } @@ -54,200 +54,28 @@ fields(action_resource_opts) -> end, emqx_bridge_v2_schema:resource_opts_fields() ); -fields(action_parameters) -> +fields(action_create) -> [ - {target, - ?HOCON( - binary(), - #{ - desc => ?DESC("config_target"), - required => false - } - )}, - {require_alias, - ?HOCON( - boolean(), - #{ - required => false, - default => false, - desc => ?DESC("config_require_alias") - } - )}, - {routing, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_routing") - } - )}, - {wait_for_active_shards, - ?HOCON( - ?UNION([pos_integer(), all]), - #{ - required => false, - desc => ?DESC("config_wait_for_active_shards") - } - )}, - {data, - ?HOCON( - ?ARRAY( - ?UNION( - [ - ?R_REF(create), - ?R_REF(delete), - ?R_REF(index), - ?R_REF(update) - ] - ) - ), - #{ - desc => ?DESC("action_parameters_data") - } - )} - ] ++ - lists:filter( - fun({K, _}) -> - not lists:member(K, [path, method, body, headers, request_timeout]) - end, - emqx_bridge_http_schema:fields("parameters_opts") - ); -fields(Action) when Action =:= create; Action =:= index -> - [ - {action, - ?HOCON( - Action, - #{ - desc => atom_to_binary(Action), - required => true - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )}, - {fields, - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_fields") - } - )} + action(create), + index(), + id(false), + doc(true), + routing(), + require_alias(), + overwrite() + | http_common_opts() ]; -fields(delete) -> +fields(action_delete) -> + [action(delete), index(), id(true), routing() | http_common_opts()]; +fields(action_update) -> [ - {action, - ?HOCON( - delete, - #{ - desc => <<"Delete">>, - required => true - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )} - ]; -fields(update) -> - [ - {action, - ?HOCON( - update, - #{ - desc => <<"Update">>, - required => true - } - )}, - {doc_as_upsert, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_doc_as_upsert") - } - )}, - {upsert, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_upsert") - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )}, - {fields, - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_fields") - } - )} + action(update), + index(), + id(true), + doc(true), + routing(), + require_alias() + | http_common_opts() ]; fields("post_bridge_v2") -> emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config); @@ -256,6 +84,111 @@ fields("put_bridge_v2") -> fields("get_bridge_v2") -> emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"). +action_union_member_selector(all_union_members) -> + [ + ?R_REF(action_create), + ?R_REF(action_delete), + ?R_REF(action_update) + ]; +action_union_member_selector({value, Value}) -> + case Value of + #{<<"action">> := <<"create">>} -> + [?R_REF(action_create)]; + #{<<"action">> := <<"delete">>} -> + [?R_REF(action_delete)]; + #{<<"action">> := <<"update">>} -> + [?R_REF(action_update)]; + _ -> + Expected = "create | delete | update", + throw(#{ + field_name => action, + expected => Expected + }) + end. + +action(Action) -> + {action, + ?HOCON( + Action, + #{ + required => true, + desc => atom_to_binary(Action) + } + )}. + +overwrite() -> + {overwrite, + ?HOCON( + boolean(), + #{ + required => false, + default => true, + desc => ?DESC("config_overwrite") + } + )}. + +index() -> + {index, + ?HOCON( + binary(), + #{ + required => true, + example => <<"${payload.index}">>, + desc => ?DESC("config_parameters_index") + } + )}. + +id(Required) -> + {id, + ?HOCON( + binary(), + #{ + required => Required, + example => <<"${payload.id}">>, + desc => ?DESC("config_parameters_id") + } + )}. + +doc(Required) -> + {doc, + ?HOCON( + binary(), + #{ + required => Required, + example => <<"${payload.doc}">>, + desc => ?DESC("config_parameters_doc") + } + )}. + +http_common_opts() -> + lists:filter( + fun({K, _}) -> + not lists:member(K, [path, method, body, headers, request_timeout]) + end, + emqx_bridge_http_schema:fields("parameters_opts") + ). + +routing() -> + {routing, + ?HOCON( + binary(), + #{ + required => false, + example => <<"${payload.routing}">>, + desc => ?DESC("config_routing") + } + )}. + +require_alias() -> + {require_alias, + ?HOCON( + boolean(), + #{ + required => false, + desc => ?DESC("config_require_alias") + } + )}. + bridge_v2_examples(Method) -> [ #{ @@ -272,34 +205,10 @@ bridge_v2_examples(Method) -> action_values() -> #{ parameters => #{ - target => <<"${target_index}">>, - data => [ - #{ - action => index, - '_index' => <<"${index}">>, - fields => <<"${fields}">>, - require_alias => <<"${require_alias}">> - }, - #{ - action => create, - '_index' => <<"${index}">>, - fields => <<"${fields}">> - }, - #{ - action => delete, - '_index' => <<"${index}">>, - '_id' => <<"${id}">> - }, - #{ - action => update, - '_index' => <<"${index}">>, - '_id' => <<"${id}">>, - fields => <<"${fields}">>, - require_alias => false, - doc_as_upsert => <<"${doc_as_upsert}">>, - upsert => <<"${upsert}">> - } - ] + action => create, + index => <<"${payload.index}">>, + overwrite => true, + doc => <<"${payload.doc}">> } }. @@ -309,4 +218,10 @@ unsupported_opts() -> batch_time ]. +desc(elasticsearch) -> ?DESC(elasticsearch); +desc(action_config) -> ?DESC(action_config); +desc(action_create) -> ?DESC(action_create); +desc(action_delete) -> ?DESC(action_delete); +desc(action_update) -> ?DESC(action_update); +desc(action_resource_opts) -> ?DESC(action_resource_opts); desc(_) -> undefined. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 22509e037..fe86eac56 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -233,7 +233,7 @@ on_get_status(InstanceId, State) -> {ok, pos_integer(), [term()], term()} | {ok, pos_integer(), [term()]} | {error, term()}. -on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) -> +on_query(InstanceId, {ChannelId, Msg} = Req, State) -> ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "elasticsearch_bridge_on_query_called", @@ -241,21 +241,16 @@ on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) -> send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of - {ok, Body} -> - handle_response( - emqx_bridge_http_connector:on_query( - InstanceId, {ChannelId, {Msg, Body}}, State - ) - ); - Error -> - Error - end. + handle_response( + emqx_bridge_http_connector:on_query( + InstanceId, {ChannelId, Msg}, State + ) + ). -spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) -> {ok, pid()} | {error, empty_request}. on_query_async( - InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, #{channels := Channels} = State + InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, State ) -> ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}), ?SLOG(debug, #{ @@ -264,22 +259,17 @@ on_query_async( send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of - {ok, Payload} -> - ReplyFunAndArgs = - { - fun(Result) -> - Response = handle_response(Result), - emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) - end, - [] - }, - emqx_bridge_http_connector:on_query_async( - InstanceId, {ChannelId, {Msg, Payload}}, ReplyFunAndArgs, State - ); - Error -> - Error - end. + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_bridge_http_connector:on_query_async( + InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State + ). on_add_channel( InstanceId, @@ -291,19 +281,17 @@ on_add_channel( true -> {error, already_exists}; _ -> - #{data := Data} = Parameter, - Parameter1 = Parameter#{path => path(Parameter), method => <<"post">>}, + Parameter1 = Parameter#{ + path => path(Parameter), + method => method(Parameter), + body => get_body_template(Parameter) + }, {ok, State} = emqx_bridge_http_connector:on_add_channel( InstanceId, State0, ChannelId, #{parameters => Parameter1} ), - case preproc_data_template(Data) of - [] -> - {error, invalid_data}; - DataTemplate -> - Channel = Parameter1#{data => DataTemplate}, - Channels2 = Channels#{ChannelId => Channel}, - {ok, State#{channels => Channels2}} - end + Channel = Parameter1, + Channels2 = Channels#{ChannelId => Channel}, + {ok, State#{channels => Channels2}} end. on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> @@ -325,124 +313,55 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- -path(Param) -> - Target = maps:get(target, Param, undefined), - QString0 = maps:fold( - fun(K, V, Acc) -> - [[atom_to_list(K), "=", to_str(V)] | Acc] +%% delete DELETE //_doc/<_id> +path(#{action := delete, id := Id, index := Index} = Action) -> + BasePath = ["/", Index, "/_doc/", Id], + Qs = add_query_string([routing], Action), + BasePath ++ Qs; +%% update POST //_update/<_id> +path(#{action := update, id := Id, index := Index} = Action) -> + BasePath = ["/", Index, "/_update/", Id], + Qs = add_query_string([routing, require_alias], Action), + BasePath ++ Qs; +%% create with id //_doc/_id +path(#{action := create, index := Index, id := Id} = Action) -> + BasePath = ["/", Index, "/_doc/", Id], + Qs = + case maps:get(overwrite, Action, true) of + true -> + add_query_string([routing, require_alias], Action); + false -> + Action1 = Action#{op_type => "create"}, + add_query_string([routing, require_alias, op_type], Action1) end, - [["_source=false"], ["filter_path=items.*.error"]], - maps:with([require_alias, routing, wait_for_active_shards], Param) - ), - QString = "?" ++ lists:join("&", QString0), - target(Target) ++ QString. + BasePath ++ Qs; +%% create without id POST //_doc/ +path(#{action := create, index := Index} = Action) -> + BasePath = ["/", Index, "/_doc/"], + Qs = add_query_string([routing, require_alias], Action), + BasePath ++ Qs. -target(undefined) -> "/_bulk"; -target(Str) -> "/" ++ binary_to_list(Str) ++ "/_bulk". +method(#{action := create}) -> <<"POST">>; +method(#{action := delete}) -> <<"DELETE">>; +method(#{action := update}) -> <<"POST">>. + +add_query_string(Keys, Param0) -> + Param1 = maps:with(Keys, Param0), + FoldFun = fun(K, V, Acc) -> [[atom_to_list(K), "=", to_str(V)] | Acc] end, + case maps:fold(FoldFun, [], Param1) of + "" -> ""; + QString -> "?" ++ lists:join("&", QString) + end. to_str(List) when is_list(List) -> List; to_str(false) -> "false"; to_str(true) -> "true"; to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). -proc_data(DataList, Msg) when is_list(DataList) -> - [ - begin - proc_data(Data, Msg) - end - || Data <- DataList - ]; -proc_data( - #{ - action := Action, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT, - fields := FieldsT - }, - Msg -) when Action =:= create; Action =:= index -> - [ - emqx_utils_json:encode( - #{ - Action => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} - ]) - } - ), - "\n", - emqx_placeholder:proc_tmpl(FieldsT, Msg), - "\n" - ]; -proc_data( - #{ - action := delete, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT - }, - Msg -) -> - [ - emqx_utils_json:encode( - #{ - delete => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} - ]) - } - ), - "\n" - ]; -proc_data( - #{ - action := update, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT, - doc_as_upsert := DocAsUpsert, - upsert := Upsert, - fields := FieldsT - }, - Msg -) -> - [ - emqx_utils_json:encode( - #{ - update => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}, - {doc_as_upsert, emqx_placeholder:proc_tmpl(DocAsUpsert, Msg)}, - {upsert, emqx_placeholder:proc_tmpl(Upsert, Msg)} - ]) - } - ), - "\n{\"doc\":", - emqx_placeholder:proc_tmpl(FieldsT, Msg), - "}\n" - ]. - -filter(List) -> - Fun = fun - ({_K, V}) when V =:= undefined; V =:= <<"undefined">>; V =:= "undefined" -> - false; - ({_K, V}) when V =:= ""; V =:= <<>> -> - false; - ({_K, V}) when V =:= "false" -> {true, false}; - ({_K, V}) when V =:= "true" -> {true, true}; - ({_K, _V}) -> - true - end, - maps:from_list(lists:filtermap(Fun, List)). - -handle_response({ok, 200, _Headers, Body} = Resp) -> - eval_response_body(Body, Resp); -handle_response({ok, 200, Body} = Resp) -> - eval_response_body(Body, Resp); +handle_response({ok, Code, _Headers, _Body} = Resp) when Code =:= 200; Code =:= 201 -> + Resp; +handle_response({ok, Code, _Body} = Resp) when Code =:= 200; Code =:= 201 -> + Resp; handle_response({ok, Code, _Headers, Body}) -> {error, #{code => Code, body => Body}}; handle_response({ok, Code, Body}) -> @@ -450,49 +369,5 @@ handle_response({ok, Code, Body}) -> handle_response({error, _} = Error) -> Error. -eval_response_body(<<"{}">>, Resp) -> Resp; -eval_response_body(Body, _Resp) -> {error, emqx_utils_json:decode(Body)}. - -preproc_data_template(DataList) when is_list(DataList) -> - [ - begin - preproc_data_template(Data) - end - || Data <- DataList - ]; -preproc_data_template(#{action := create} = Data) -> - Index = maps:get('_index', Data, ""), - Id = maps:get('_id', Data, ""), - RequiredAlias = maps:get(require_alias, Data, ""), - Fields = maps:get(fields, Data, ""), - #{ - action => create, - '_index' => emqx_placeholder:preproc_tmpl(Index), - '_id' => emqx_placeholder:preproc_tmpl(Id), - require_alias => emqx_placeholder:preproc_tmpl(RequiredAlias), - fields => emqx_placeholder:preproc_tmpl(Fields) - }; -preproc_data_template(#{action := index} = Data) -> - Data1 = preproc_data_template(Data#{action => create}), - Data1#{action => index}; -preproc_data_template(#{action := delete} = Data) -> - Data1 = preproc_data_template(Data#{action => create}), - Data2 = Data1#{action => delete}, - maps:remove(fields, Data2); -preproc_data_template(#{action := update} = Data) -> - Data1 = preproc_data_template(Data#{action => index}), - DocAsUpsert = maps:get(doc_as_upsert, Data, ""), - Upsert = maps:get(upsert, Data, ""), - Data1#{ - action => update, - doc_as_upsert => emqx_placeholder:preproc_tmpl(DocAsUpsert), - upsert => emqx_placeholder:preproc_tmpl(Upsert) - }. - -try_render_message({ChannelId, Msg}, Channels) -> - case maps:find(ChannelId, Channels) of - {ok, #{data := Data}} -> - {ok, proc_data(Data, Msg)}; - _ -> - {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} - end. +get_body_template(#{doc := Doc}) -> Doc; +get_body_template(_) -> undefined. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index f00ae8523..8f54694e9 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -317,7 +317,7 @@ on_query(InstId, {send_message, Msg}, State) -> %% BridgeV2 entrypoint on_query( InstId, - {ActionId, MsgAndBody}, + {ActionId, Msg}, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of @@ -334,10 +334,10 @@ on_query( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, MsgAndBody), + } = process_request_and_action(Request, ActionState, Msg), %% bridge buffer worker has retry, do not let ehttpc retry Retry = 2, - ClientId = clientid(MsgAndBody), + ClientId = clientid(Msg), on_query( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, @@ -430,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> %% BridgeV2 entrypoint on_query_async( InstId, - {ActionId, MsgAndBody}, + {ActionId, Msg}, ReplyFunAndArgs, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> @@ -448,8 +448,8 @@ on_query_async( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, MsgAndBody), - ClientId = clientid(MsgAndBody), + } = process_request_and_action(Request, ActionState, Msg), + ClientId = clientid(Msg), on_query_async( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout}, @@ -629,7 +629,7 @@ maybe_parse_template(Key, Conf) -> parse_template(String) -> emqx_template:parse(String). -process_request_and_action(Request, ActionState, {Msg, Body}) -> +process_request_and_action(Request, ActionState, Msg) -> MethodTemplate = maps:get(method, ActionState), Method = make_method(render_template_string(MethodTemplate, Msg)), PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), @@ -647,17 +647,15 @@ process_request_and_action(Request, ActionState, {Msg, Body}) -> render_headers(HeadersTemplate1, Msg), render_headers(HeadersTemplate2, Msg) ), + BodyTemplate = maps:get(body, ActionState), + Body = render_request_body(BodyTemplate, Msg), #{ method => Method, path => Path, body => Body, headers => Headers, request_timeout => maps:get(request_timeout, ActionState) - }; -process_request_and_action(Request, ActionState, Msg) -> - BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), - process_request_and_action(Request, ActionState, {Msg, Body}). + }. merge_proplist(Proplist1, Proplist2) -> lists:foldl( @@ -877,7 +875,6 @@ redact_request({Path, Headers}) -> redact_request({Path, Headers, _Body}) -> {Path, Headers, <<"******">>}. -clientid({Msg, _Body}) -> clientid(Msg); clientid(Msg) -> maps:get(clientid, Msg, undefined). -ifdef(TEST). diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 822e8429b..655892d88 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -190,7 +190,7 @@ connector_structs() -> mk( hoconsc:map(name, ref(emqx_bridge_es_connector, config)), #{ - desc => <<"Elastis Search Connector Config">>, + desc => <<"ElasticSearch Connector Config">>, required => false } )} diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index 78299c4ee..62778a712 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -1,5 +1,10 @@ emqx_bridge_es { +elasticsearch.desc: +"""Elasticsearch Bridge""" +elasticsearch.label: +"""ElasticSearch""" + config_enable.desc: """Enable or disable this bridge""" @@ -74,15 +79,9 @@ desc_name.desc: desc_name.label: """Bridge Name""" -config_parameters_action.desc: -"""TODO""" - -config_parameters_action.label: -"""Action""" - config_parameters_index.desc: -"""Name of the data stream, index, or index alias to perform the action on. -This parameter is required if a is not specified in the request path.""" +"""Name of index, or index alias to perform the action on. +This parameter is required.""" config_parameters_index.label: """_index""" @@ -97,28 +96,10 @@ config_parameters_require_alias.desc: config_parameters_require_alias.label: """_require_alias""" -config_parameters_fields.desc: -"""The document source to index. Required for create and index operations.""" -config_parameters_fields.label: -"""fields""" - -config_parameters_doc_as_upsert.desc: -"""Instead of sending a partial doc plus an upsert doc, you can set doc_as_upsert to true -to use the contents of doc as the upsert value.""" -config_parameters_doc_as_upsert.label: -"""doc_as_upsert""" - -config_parameters_upsert.desc: -"""If the document does not already exist, the contents of the upsert element are inserted as a new document.""" -config_parameters_upsert.label: -"""upsert""" - - -action_parameters_data.desc: -"""ElasticSearch action parameter data""" - -action_parameters_data.label: -"""Parameter Data""" +config_parameters_doc.desc: +"""JSON document""" +config_parameters_doc.label: +"""doc""" action_parameters.desc: """ElasticSearch action parameters""" @@ -126,4 +107,38 @@ action_parameters.desc: action_parameters.label: """Parameters""" +config_overwrite.desc: +"""Set to false If a document with the specified _id already exists(conflict), the operation will fail.""" + +config_overwrite.label: +"""overwrite""" + +action_config.desc: +"""ElasticSearch Action Configuration""" +action_config.label: +"""ElasticSearch Action Config""" + +action_create.desc: +"""Adds a JSON document to the specified index and makes it searchable. +If the target is an index and the document already exists, +the request updates the document and increments its version.""" +action_create.label: +"""Create Doc""" + +action_delete.desc: +"""Removes a JSON document from the specified index.""" +action_delete.label: +"""Delete Doc""" + +action_update.desc: +"""Updates a document using the specified doc.""" +action_update.label: +"""Update Doc""" + +action_resource_opts.desc: +"""Resource options.""" + +action_resource_opts.label: +"""Resource Options""" + } diff --git a/rel/i18n/emqx_bridge_es_connector.hocon b/rel/i18n/emqx_bridge_es_connector.hocon index f980b3aca..ddd53e0fc 100644 --- a/rel/i18n/emqx_bridge_es_connector.hocon +++ b/rel/i18n/emqx_bridge_es_connector.hocon @@ -24,11 +24,6 @@ config_auth_basic_password.desc: config_auth_basic_password.label: """HTTP Basic Auth Password""" -config_base_url.desc: -"""The base URL of the external ElasticSearch service's REST interface.""" -config_base_url.label: -"""ElasticSearch REST Service Base URL""" - config_max_retries.desc: """HTTP request max retry times if failed.""" diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 401df33a6..1d98d82db 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -297,3 +297,5 @@ Syskeeper msacc now_us ns +elasticsearch +ElasticSearch