diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index 57ab648b5..b47e5f9f8 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -31,9 +31,9 @@ fields(action) -> )}; fields(action_config) -> emqx_resource_schema:override( - emqx_bridge_v2_schema:make_producer_action_schema( + emqx_bridge_v2_schema:make_consumer_action_schema( ?HOCON( - ?R_REF(action_parameters), + ?UNION(fun action_union_member_selector/1), #{ required => true, desc => ?DESC("action_parameters") } @@ -54,200 +54,28 @@ fields(action_resource_opts) -> end, emqx_bridge_v2_schema:resource_opts_fields() ); -fields(action_parameters) -> +fields(action_create) -> [ - {target, - ?HOCON( - binary(), - #{ - desc => ?DESC("config_target"), - required => false - } - )}, - {require_alias, - ?HOCON( - boolean(), - #{ - required => false, - default => false, - desc => ?DESC("config_require_alias") - } - )}, - {routing, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_routing") - } - )}, - {wait_for_active_shards, - ?HOCON( - ?UNION([pos_integer(), all]), - #{ - required => false, - desc => ?DESC("config_wait_for_active_shards") - } - )}, - {data, - ?HOCON( - ?ARRAY( - ?UNION( - [ - ?R_REF(create), - ?R_REF(delete), - ?R_REF(index), - ?R_REF(update) - ] - ) - ), - #{ - desc => ?DESC("action_parameters_data") - } - )} - ] ++ - lists:filter( - fun({K, _}) -> - not lists:member(K, [path, method, body, headers, request_timeout]) - end, - emqx_bridge_http_schema:fields("parameters_opts") - ); -fields(Action) when Action =:= create; Action =:= index -> - [ - {action, - ?HOCON( - Action, - #{ - desc => atom_to_binary(Action), - required => true - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )}, - {fields, - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_fields") - } - )} + action(create), + index(), + id(false), + doc(true), + routing(), + require_alias(), + overwrite() + | http_common_opts() ]; -fields(delete) -> +fields(action_delete) -> + [action(delete), index(), id(true), routing() | http_common_opts()]; +fields(action_update) -> [ - {action, - ?HOCON( - delete, - #{ - desc => <<"Delete">>, - required => true - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )} - ]; -fields(update) -> - [ - {action, - ?HOCON( - update, - #{ - desc => <<"Update">>, - required => true - } - )}, - {doc_as_upsert, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_doc_as_upsert") - } - )}, - {upsert, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_upsert") - } - )}, - {'_index', - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_index") - } - )}, - {'_id', - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_id") - } - )}, - {require_alias, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("config_parameters_require_alias") - } - )}, - {fields, - ?HOCON( - binary(), - #{ - required => true, - desc => ?DESC("config_parameters_fields") - } - )} + action(update), + index(), + id(true), + doc(true), + routing(), + require_alias() + | http_common_opts() ]; fields("post_bridge_v2") -> emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config); @@ -256,6 +84,111 @@ fields("put_bridge_v2") -> fields("get_bridge_v2") -> emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"). +action_union_member_selector(all_union_members) -> + [ + ?R_REF(action_create), + ?R_REF(action_delete), + ?R_REF(action_update) + ]; +action_union_member_selector({value, Value}) -> + case Value of + #{<<"action">> := <<"create">>} -> + [?R_REF(action_create)]; + #{<<"action">> := <<"delete">>} -> + [?R_REF(action_delete)]; + #{<<"action">> := <<"update">>} -> + [?R_REF(action_update)]; + _ -> + Expected = "create | delete | update", + throw(#{ + field_name => action, + expected => Expected + }) + end. + +action(Action) -> + {action, + ?HOCON( + Action, + #{ + required => true, + desc => atom_to_binary(Action) + } + )}. + +overwrite() -> + {overwrite, + ?HOCON( + boolean(), + #{ + required => false, + default => true, + desc => ?DESC("config_overwrite") + } + )}. + +index() -> + {index, + ?HOCON( + binary(), + #{ + required => true, + example => <<"${payload.index}">>, + desc => ?DESC("config_parameters_target") + } + )}. + +id(Required) -> + {'id', + ?HOCON( + binary(), + #{ + required => Required, + example => <<"${payload.id}">>, + desc => ?DESC("config_parameters_id") + } + )}. + +doc(Required) -> + {'doc', + ?HOCON( + binary(), + #{ + required => Required, + example => <<"${payload.doc}">>, + desc => ?DESC("config_parameters_doc") + } + )}. + +http_common_opts() -> + lists:filter( + fun({K, _}) -> + not lists:member(K, [path, method, body, headers, request_timeout]) + end, + emqx_bridge_http_schema:fields("parameters_opts") + ). + +routing() -> + {routing, + ?HOCON( + binary(), + #{ + required => false, + example => <<"${payload.routing}">>, + desc => ?DESC("config_routing") + } + )}. + +require_alias() -> + {require_alias, + ?HOCON( + boolean(), + #{ + required => false, + desc => ?DESC("config_require_alias") + } + )}. + bridge_v2_examples(Method) -> [ #{ @@ -272,34 +205,10 @@ bridge_v2_examples(Method) -> action_values() -> #{ parameters => #{ - target => <<"${target_index}">>, - data => [ - #{ - action => index, - '_index' => <<"${index}">>, - fields => <<"${fields}">>, - require_alias => <<"${require_alias}">> - }, - #{ - action => create, - '_index' => <<"${index}">>, - fields => <<"${fields}">> - }, - #{ - action => delete, - '_index' => <<"${index}">>, - '_id' => <<"${id}">> - }, - #{ - action => update, - '_index' => <<"${index}">>, - '_id' => <<"${id}">>, - fields => <<"${fields}">>, - require_alias => false, - doc_as_upsert => <<"${doc_as_upsert}">>, - upsert => <<"${upsert}">> - } - ] + action => create, + index => <<"${payload.index}">>, + overwrite => true, + doc => <<"${payload.doc}">> } }. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 22509e037..f89635b26 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -233,7 +233,7 @@ on_get_status(InstanceId, State) -> {ok, pos_integer(), [term()], term()} | {ok, pos_integer(), [term()]} | {error, term()}. -on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) -> +on_query(InstanceId, {ChannelId, Msg} = Req, State) -> ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "elasticsearch_bridge_on_query_called", @@ -241,21 +241,16 @@ on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) -> send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of - {ok, Body} -> - handle_response( - emqx_bridge_http_connector:on_query( - InstanceId, {ChannelId, {Msg, Body}}, State - ) - ); - Error -> - Error - end. + handle_response( + emqx_bridge_http_connector:on_query( + InstanceId, {ChannelId, Msg}, State + ) + ). -spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) -> {ok, pid()} | {error, empty_request}. on_query_async( - InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, #{channels := Channels} = State + InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, State ) -> ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}), ?SLOG(debug, #{ @@ -264,22 +259,17 @@ on_query_async( send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of - {ok, Payload} -> - ReplyFunAndArgs = - { - fun(Result) -> - Response = handle_response(Result), - emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) - end, - [] - }, - emqx_bridge_http_connector:on_query_async( - InstanceId, {ChannelId, {Msg, Payload}}, ReplyFunAndArgs, State - ); - Error -> - Error - end. + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_bridge_http_connector:on_query_async( + InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State + ). on_add_channel( InstanceId, @@ -291,19 +281,17 @@ on_add_channel( true -> {error, already_exists}; _ -> - #{data := Data} = Parameter, - Parameter1 = Parameter#{path => path(Parameter), method => <<"post">>}, + Parameter1 = Parameter#{ + path => path(Parameter), + method => method(Parameter), + body => get_body_template(Parameter) + }, {ok, State} = emqx_bridge_http_connector:on_add_channel( InstanceId, State0, ChannelId, #{parameters => Parameter1} ), - case preproc_data_template(Data) of - [] -> - {error, invalid_data}; - DataTemplate -> - Channel = Parameter1#{data => DataTemplate}, - Channels2 = Channels#{ChannelId => Channel}, - {ok, State#{channels => Channels2}} - end + Channel = Parameter1, + Channels2 = Channels#{ChannelId => Channel}, + {ok, State#{channels => Channels2}} end. on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> @@ -325,124 +313,59 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- -path(Param) -> - Target = maps:get(target, Param, undefined), - QString0 = maps:fold( - fun(K, V, Acc) -> - [[atom_to_list(K), "=", to_str(V)] | Acc] +%% delete DELETE //_doc/<_id> +path(#{action := delete, id := Id, index := Index} = Action) -> + BasePath = ["/", Index, "/_doc/", Id], + Qs = add_query_string([routing], Action), + BasePath ++ Qs; +%% update POST //_update/<_id> +path(#{action := update, id := Id, index := Index} = Action) -> + BasePath = ["/", Index, "/_update/", Id], + Qs = add_query_string([routing, require_alias], Action), + BasePath ++ Qs; +%% create with id //_doc/_id +path(#{action := create, index := Index, id := Id} = Action) -> + BasePath = ["/", Index, "/_doc/", Id], + Qs = + case maps:get(overwrite, Action, true) of + true -> + add_query_string([routing, require_alias], Action); + false -> + Action1 = Action#{op_type => "create"}, + add_query_string([routing, require_alias, op_type], Action1) end, - [["_source=false"], ["filter_path=items.*.error"]], - maps:with([require_alias, routing, wait_for_active_shards], Param) - ), - QString = "?" ++ lists:join("&", QString0), - target(Target) ++ QString. + BasePath ++ Qs; +%% create without id POST //_doc/ +path(#{action := create, index := Index} = Action) -> + BasePath = ["/", Index, "/_doc/"], + Qs = add_query_string([routing, require_alias], Action), + BasePath ++ Qs. -target(undefined) -> "/_bulk"; -target(Str) -> "/" ++ binary_to_list(Str) ++ "/_bulk". +method(#{action := create}) -> <<"POST">>; +method(#{action := delete}) -> <<"DELETE">>; +method(#{action := update}) -> <<"POST">>. + +add_query_string(_Keys, Param) when map_size(Param) =:= 0 -> ""; +add_query_string(Keys, Param) -> + QString = + maps:fold( + fun(K, V, Acc) -> + [[atom_to_list(K), "=", to_str(V)] | Acc] + end, + [], + maps:with(Keys, Param) + ), + "?" ++ lists:join("&", QString). to_str(List) when is_list(List) -> List; to_str(false) -> "false"; to_str(true) -> "true"; to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). -proc_data(DataList, Msg) when is_list(DataList) -> - [ - begin - proc_data(Data, Msg) - end - || Data <- DataList - ]; -proc_data( - #{ - action := Action, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT, - fields := FieldsT - }, - Msg -) when Action =:= create; Action =:= index -> - [ - emqx_utils_json:encode( - #{ - Action => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} - ]) - } - ), - "\n", - emqx_placeholder:proc_tmpl(FieldsT, Msg), - "\n" - ]; -proc_data( - #{ - action := delete, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT - }, - Msg -) -> - [ - emqx_utils_json:encode( - #{ - delete => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)} - ]) - } - ), - "\n" - ]; -proc_data( - #{ - action := update, - '_index' := IndexT, - '_id' := IdT, - require_alias := RequiredAliasT, - doc_as_upsert := DocAsUpsert, - upsert := Upsert, - fields := FieldsT - }, - Msg -) -> - [ - emqx_utils_json:encode( - #{ - update => filter([ - {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)}, - {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)}, - {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}, - {doc_as_upsert, emqx_placeholder:proc_tmpl(DocAsUpsert, Msg)}, - {upsert, emqx_placeholder:proc_tmpl(Upsert, Msg)} - ]) - } - ), - "\n{\"doc\":", - emqx_placeholder:proc_tmpl(FieldsT, Msg), - "}\n" - ]. - -filter(List) -> - Fun = fun - ({_K, V}) when V =:= undefined; V =:= <<"undefined">>; V =:= "undefined" -> - false; - ({_K, V}) when V =:= ""; V =:= <<>> -> - false; - ({_K, V}) when V =:= "false" -> {true, false}; - ({_K, V}) when V =:= "true" -> {true, true}; - ({_K, _V}) -> - true - end, - maps:from_list(lists:filtermap(Fun, List)). - -handle_response({ok, 200, _Headers, Body} = Resp) -> - eval_response_body(Body, Resp); -handle_response({ok, 200, Body} = Resp) -> - eval_response_body(Body, Resp); +handle_response({ok, Code, _Headers, _Body} = Resp) when Code =:= 200; Code =:= 201 -> + Resp; +handle_response({ok, Code, _Body} = Resp) when Code =:= 200; Code =:= 201 -> + Resp; handle_response({ok, Code, _Headers, Body}) -> {error, #{code => Code, body => Body}}; handle_response({ok, Code, Body}) -> @@ -450,49 +373,5 @@ handle_response({ok, Code, Body}) -> handle_response({error, _} = Error) -> Error. -eval_response_body(<<"{}">>, Resp) -> Resp; -eval_response_body(Body, _Resp) -> {error, emqx_utils_json:decode(Body)}. - -preproc_data_template(DataList) when is_list(DataList) -> - [ - begin - preproc_data_template(Data) - end - || Data <- DataList - ]; -preproc_data_template(#{action := create} = Data) -> - Index = maps:get('_index', Data, ""), - Id = maps:get('_id', Data, ""), - RequiredAlias = maps:get(require_alias, Data, ""), - Fields = maps:get(fields, Data, ""), - #{ - action => create, - '_index' => emqx_placeholder:preproc_tmpl(Index), - '_id' => emqx_placeholder:preproc_tmpl(Id), - require_alias => emqx_placeholder:preproc_tmpl(RequiredAlias), - fields => emqx_placeholder:preproc_tmpl(Fields) - }; -preproc_data_template(#{action := index} = Data) -> - Data1 = preproc_data_template(Data#{action => create}), - Data1#{action => index}; -preproc_data_template(#{action := delete} = Data) -> - Data1 = preproc_data_template(Data#{action => create}), - Data2 = Data1#{action => delete}, - maps:remove(fields, Data2); -preproc_data_template(#{action := update} = Data) -> - Data1 = preproc_data_template(Data#{action => index}), - DocAsUpsert = maps:get(doc_as_upsert, Data, ""), - Upsert = maps:get(upsert, Data, ""), - Data1#{ - action => update, - doc_as_upsert => emqx_placeholder:preproc_tmpl(DocAsUpsert), - upsert => emqx_placeholder:preproc_tmpl(Upsert) - }. - -try_render_message({ChannelId, Msg}, Channels) -> - case maps:find(ChannelId, Channels) of - {ok, #{data := Data}} -> - {ok, proc_data(Data, Msg)}; - _ -> - {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} - end. +get_body_template(#{doc := Doc}) -> Doc; +get_body_template(_) -> undefined. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index f00ae8523..8f54694e9 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -317,7 +317,7 @@ on_query(InstId, {send_message, Msg}, State) -> %% BridgeV2 entrypoint on_query( InstId, - {ActionId, MsgAndBody}, + {ActionId, Msg}, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of @@ -334,10 +334,10 @@ on_query( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, MsgAndBody), + } = process_request_and_action(Request, ActionState, Msg), %% bridge buffer worker has retry, do not let ehttpc retry Retry = 2, - ClientId = clientid(MsgAndBody), + ClientId = clientid(Msg), on_query( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, @@ -430,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> %% BridgeV2 entrypoint on_query_async( InstId, - {ActionId, MsgAndBody}, + {ActionId, Msg}, ReplyFunAndArgs, State = #{installed_actions := InstalledActions} ) when is_binary(ActionId) -> @@ -448,8 +448,8 @@ on_query_async( body := Body, headers := Headers, request_timeout := Timeout - } = process_request_and_action(Request, ActionState, MsgAndBody), - ClientId = clientid(MsgAndBody), + } = process_request_and_action(Request, ActionState, Msg), + ClientId = clientid(Msg), on_query_async( InstId, {ClientId, Method, {Path, Headers, Body}, Timeout}, @@ -629,7 +629,7 @@ maybe_parse_template(Key, Conf) -> parse_template(String) -> emqx_template:parse(String). -process_request_and_action(Request, ActionState, {Msg, Body}) -> +process_request_and_action(Request, ActionState, Msg) -> MethodTemplate = maps:get(method, ActionState), Method = make_method(render_template_string(MethodTemplate, Msg)), PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), @@ -647,17 +647,15 @@ process_request_and_action(Request, ActionState, {Msg, Body}) -> render_headers(HeadersTemplate1, Msg), render_headers(HeadersTemplate2, Msg) ), + BodyTemplate = maps:get(body, ActionState), + Body = render_request_body(BodyTemplate, Msg), #{ method => Method, path => Path, body => Body, headers => Headers, request_timeout => maps:get(request_timeout, ActionState) - }; -process_request_and_action(Request, ActionState, Msg) -> - BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), - process_request_and_action(Request, ActionState, {Msg, Body}). + }. merge_proplist(Proplist1, Proplist2) -> lists:foldl( @@ -877,7 +875,6 @@ redact_request({Path, Headers}) -> redact_request({Path, Headers, _Body}) -> {Path, Headers, <<"******">>}. -clientid({Msg, _Body}) -> clientid(Msg); clientid(Msg) -> maps:get(clientid, Msg, undefined). -ifdef(TEST). diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index 78299c4ee..63b0fa4bc 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -126,4 +126,10 @@ action_parameters.desc: action_parameters.label: """Parameters""" +config_overwrite.desc: +"""Set to false If a document with the specified _id already exists(conflict), the operation will failed.""" + +config_overwrite.label: +"""overwrite""" + }