From 59797cfea7a2f79da3d95e15c1279d8c0a9deafd Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 18 Jan 2024 15:43:02 +0800 Subject: [PATCH] feat: es's update support doc_as_upsert --- apps/emqx_bridge_es/src/emqx_bridge_es.erl | 12 ++ .../src/emqx_bridge_es_connector.erl | 33 +++++- .../test/emqx_bridge_es_SUITE.erl | 109 ++++++++++-------- .../src/emqx_bridge_http_connector.erl | 42 ++++--- rel/i18n/emqx_bridge_es.hocon | 6 + 5 files changed, 129 insertions(+), 73 deletions(-) diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index 569b67dba..032439574 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -73,6 +73,7 @@ fields(action_update) -> index(), id(true), doc(), + doc_as_upsert(), routing(), require_alias() | http_common_opts() @@ -172,6 +173,17 @@ http_common_opts() -> emqx_bridge_http_schema:fields("parameters_opts") ). +doc_as_upsert() -> + {doc_as_upsert, + ?HOCON( + boolean(), + #{ + required => false, + default => false, + desc => ?DESC("config_doc_as_upsert") + } + )}. + routing() -> {routing, ?HOCON( diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 7e49aeb55..8b68af10f 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -33,6 +33,8 @@ connector_example_values/0 ]). +-export([render_template/2]). + %% emqx_connector_resource behaviour callbacks -export([connector_config/2]). @@ -286,8 +288,12 @@ on_add_channel( method => method(Parameter), body => get_body_template(Parameter) }, + ChannelConfig = #{ + parameters => Parameter1, + render_template_func => fun ?MODULE:render_template/2 + }, {ok, State} = emqx_bridge_http_connector:on_add_channel( - InstanceId, State0, ChannelId, #{parameters => Parameter1} + InstanceId, State0, ChannelId, ChannelConfig ), Channel = Parameter1, Channels2 = Channels#{ChannelId => Channel}, @@ -310,9 +316,23 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> {error, not_exists} end. +render_template(Template, Msg) -> + % Ignoring errors here, undefined bindings will be replaced with empty string. + Opts = #{var_trans => fun to_string/2}, + {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}, Opts), + String. + %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- + +to_string(Name, Value) -> + emqx_template:to_string(render_var(Name, Value)). +render_var(_, undefined) -> + % NOTE Any allowed but undefined binding will be replaced with empty string + <<>>; +render_var(_Name, Value) -> + Value. %% delete DELETE //_doc/<_id> path(#{action := delete, id := Id, index := Index} = Action) -> BasePath = ["/", Index, "/_doc/", Id], @@ -370,5 +390,12 @@ handle_response({ok, Code, Body}) -> handle_response({error, _} = Error) -> Error. -get_body_template(#{doc := Doc}) -> Doc; -get_body_template(_) -> undefined. +get_body_template(#{action := update, doc := Doc} = Template) -> + case maps:get(doc_as_upsert, Template, false) of + false -> <<"{\"doc\":", Doc/binary, "}">>; + true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">> + end; +get_body_template(#{doc := Doc}) -> + Doc; +get_body_template(_) -> + undefined. diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl index e7e2dba28..a9ff70957 100644 --- a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl @@ -103,45 +103,46 @@ end_per_testcase(_TestCase, _Config) -> %% Helper fns %%------------------------------------------------------------------------------------- -check_send_message_with_action(ActionName, ConnectorName) -> - #{payload := _Payload} = send_message(ActionName), +check_send_message_with_action(Topic, ActionName, ConnectorName) -> + send_message(Topic), %% ###################################### %% Check if message is sent to es %% ###################################### + timer:sleep(500), check_action_metrics(ActionName, ConnectorName). -send_message(ActionName) -> - %% ###################################### - %% Create message - %% ###################################### - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), - Payload = #{<<"name">> => <<"emqx">>, <<"release_time">> => BinTime}, +send_message(Topic) -> + Now = emqx_utils_calendar:now_to_rfc3339(microsecond), + Doc = #{<<"name">> => <<"emqx">>, <<"release_date">> => Now}, Index = <<"emqx-test-index">>, - Msg = #{ - clientid => BinTime, - payload => Payload, - timestamp => Time, - index => Index - }, - %% ###################################### - %% Send message - %% ###################################### - emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}), - #{payload => Payload}. + Payload = emqx_utils_json:encode(#{doc => Doc, index => Index}), + + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + {ok, Client} = emqtt:start_link([{clientid, ClientId}, {port, 1883}]), + {ok, _} = emqtt:connect(Client), + ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]), + ok. check_action_metrics(ActionName, ConnectorName) -> ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), Metrics = #{ match => emqx_resource_metrics:matched_get(ActionId), + success => emqx_resource_metrics:success_get(ActionId), failed => emqx_resource_metrics:failed_get(ActionId), queuing => emqx_resource_metrics:queuing_get(ActionId), dropped => emqx_resource_metrics:dropped_get(ActionId) }, ?assertEqual( - #{match => 1, dropped => 0, failed => 0, queuing => 0}, - Metrics + #{ + match => 1, + success => 1, + dropped => 0, + failed => 0, + queuing => 0 + }, + Metrics, + {ActionName, ConnectorName, ActionId} ). action_config(ConnectorName) -> @@ -164,7 +165,7 @@ action(ConnectorName) -> <<"connector">> => ConnectorName, <<"resource_opts">> => #{ <<"health_check_interval">> => <<"30s">>, - <<"query_mode">> => <<"async">> + <<"query_mode">> => <<"sync">> } }. @@ -235,7 +236,8 @@ t_create_remove_list(Config) -> #{ name := <<"test_action_1">>, type := <<"elasticsearch">>, - raw_config := _RawConfig + raw_config := _, + status := connected } = ActionInfo, {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig), 2 = length(emqx_bridge_v2:list()), @@ -252,39 +254,44 @@ t_send_message(Config) -> {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig), ActionConfig = action(<<"test_connector2">>), {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig), + Rule = #{ + id => <<"rule:t_es">>, + sql => <<"SELECT\n *\nFROM\n \"es/#\"">>, + actions => [<<"elasticsearch:test_action_1">>], + description => <<"sink doc to elasticsearch">> + }, + {ok, _} = emqx_rule_engine:create_rule(Rule), %% Use the action to send a message - check_send_message_with_action(test_action_1, test_connector2), + check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2), %% Create a few more bridges with the same connector and test them - BridgeNames1 = [ - list_to_atom("test_bridge_v2_" ++ integer_to_list(I)) - || I <- lists:seq(2, 10) - ], - lists:foreach( - fun(BridgeName) -> - {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, ActionConfig), - check_send_message_with_action(BridgeName, test_connector2) - end, - BridgeNames1 - ), - BridgeNames = [test_bridge_v2_1 | BridgeNames1], - %% Send more messages to the bridges - lists:foreach( - fun(BridgeName) -> - lists:foreach( - fun(_) -> - check_send_message_with_action(BridgeName, test_connector2) - end, - lists:seq(1, 10) - ) - end, - BridgeNames - ), + ActionNames1 = + lists:foldl( + fun(I, Acc) -> + Seq = integer_to_binary(I), + ActionNameStr = "test_action_" ++ integer_to_list(I), + ActionName = list_to_atom(ActionNameStr), + {ok, _} = emqx_bridge_v2:create(?TYPE, ActionName, ActionConfig), + Rule1 = #{ + id => <<"rule:t_es", Seq/binary>>, + sql => <<"SELECT\n *\nFROM\n \"es/", Seq/binary, "\"">>, + actions => [<<"elasticsearch:", (list_to_binary(ActionNameStr))/binary>>], + description => <<"sink doc to elasticsearch">> + }, + {ok, _} = emqx_rule_engine:create_rule(Rule1), + Topic = <<"es/", Seq/binary>>, + check_send_message_with_action(Topic, ActionName, test_connector2), + [ActionName | Acc] + end, + [], + lists:seq(2, 10) + ), + ActionNames = [test_action_1 | ActionNames1], %% Remove all the bridges lists:foreach( fun(BridgeName) -> ok = emqx_bridge_v2:remove(?TYPE, BridgeName) end, - BridgeNames + ActionNames ), emqx_connector:remove(?TYPE, test_connector2), ok. @@ -361,7 +368,7 @@ t_http_api_get(Config) -> <<"max_retries">> := 2, <<"overwrite">> := true }, - <<"resource_opts">> := #{<<"query_mode">> := <<"async">>}, + <<"resource_opts">> := #{<<"query_mode">> := <<"sync">>}, <<"status">> := <<"connected">>, <<"status_reason">> := <<>>, <<"type">> := <<"elasticsearch">> diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 8f54694e9..81acec602 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -266,7 +266,9 @@ on_add_channel( ) -> InstalledActions = maps:get(installed_actions, OldState, #{}), {ok, ActionState} = do_create_http_action(ActionConfig), - NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions), + RenderTemplate = maps:get(render_template_func, ActionConfig, fun render_template/2), + ActionState1 = ActionState#{render_template_func => RenderTemplate}, + NewInstalledActions = maps:put(ActionId, ActionState1, InstalledActions), NewState = maps:put(installed_actions, NewInstalledActions, OldState), {ok, NewState}. @@ -631,9 +633,10 @@ parse_template(String) -> process_request_and_action(Request, ActionState, Msg) -> MethodTemplate = maps:get(method, ActionState), - Method = make_method(render_template_string(MethodTemplate, Msg)), - PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), - PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)), + RenderTmplFunc = maps:get(render_template_func, ActionState), + Method = make_method(render_template_string(MethodTemplate, RenderTmplFunc, Msg)), + PathPrefix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, Request), Msg)), + PathSuffix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, ActionState), Msg)), Path = case PathSuffix of @@ -644,11 +647,11 @@ process_request_and_action(Request, ActionState, Msg) -> HeadersTemplate1 = maps:get(headers, Request), HeadersTemplate2 = maps:get(headers, ActionState), Headers = merge_proplist( - render_headers(HeadersTemplate1, Msg), - render_headers(HeadersTemplate2, Msg) + render_headers(HeadersTemplate1, RenderTmplFunc, Msg), + render_headers(HeadersTemplate2, RenderTmplFunc, Msg) ), BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), + Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg), #{ method => Method, path => Path, @@ -681,25 +684,26 @@ process_request( } = Conf, Msg ) -> + RenderTemplateFun = fun render_template/2, Conf#{ - method => make_method(render_template_string(MethodTemplate, Msg)), - path => unicode:characters_to_list(render_template(PathTemplate, Msg)), - body => render_request_body(BodyTemplate, Msg), - headers => render_headers(HeadersTemplate, Msg), + method => make_method(render_template_string(MethodTemplate, RenderTemplateFun, Msg)), + path => unicode:characters_to_list(RenderTemplateFun(PathTemplate, Msg)), + body => render_request_body(BodyTemplate, RenderTemplateFun, Msg), + headers => render_headers(HeadersTemplate, RenderTemplateFun, Msg), request_timeout => ReqTimeout }. -render_request_body(undefined, Msg) -> +render_request_body(undefined, _, Msg) -> emqx_utils_json:encode(Msg); -render_request_body(BodyTks, Msg) -> - render_template(BodyTks, Msg). +render_request_body(BodyTks, RenderTmplFunc, Msg) -> + RenderTmplFunc(BodyTks, Msg). -render_headers(HeaderTks, Msg) -> +render_headers(HeaderTks, RenderTmplFunc, Msg) -> lists:map( fun({K, V}) -> { - render_template_string(K, Msg), - render_template_string(emqx_secret:unwrap(V), Msg) + render_template_string(K, RenderTmplFunc, Msg), + render_template_string(emqx_secret:unwrap(V), RenderTmplFunc, Msg) } end, HeaderTks @@ -710,8 +714,8 @@ render_template(Template, Msg) -> {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}), String. -render_template_string(Template, Msg) -> - unicode:characters_to_binary(render_template(Template, Msg)). +render_template_string(Template, RenderTmplFunc, Msg) -> + unicode:characters_to_binary(RenderTmplFunc(Template, Msg)). make_method(M) when M == <<"POST">>; M == <<"post">> -> post; make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index f5d0f3c02..8ad11f05b 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -56,6 +56,12 @@ config_routing.desc: config_routing.label: """Routing""" +config_doc_as_upsert.desc: +"""Instead of sending a partial doc plus an upsert doc, +you can set doc_as_upsert to true to use the contents of doc as the upsert value.""" +config_doc_as_upsert.label: +"""doc_as_upsert""" + config_wait_for_active_shards.desc: """The number of shard copies that must be active before proceeding with the operation. Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1).