diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 8b68af10f..256645c31 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -34,6 +34,7 @@ ]). -export([render_template/2]). +-export([convert_server/2]). %% emqx_connector_resource behaviour callbacks -export([connector_config/2]). @@ -92,7 +93,7 @@ connector_example_values() -> <<"username">> => <<"root">>, <<"password">> => <<"******">> }, - base_url => <<"http://127.0.0.1:9200/">>, + server => <<"127.0.0.1:9200">>, connect_timeout => <<"15s">>, pool_type => <<"random">>, pool_size => 8, @@ -116,14 +117,7 @@ fields(config) -> fields("connection_fields"); fields("connection_fields") -> [ - {base_url, - ?HOCON( - emqx_schema:url(), - #{ - required => true, - desc => ?DESC(emqx_bridge_es, "config_base_url") - } - )}, + {server, server()}, {authentication, ?HOCON( ?UNION([?R_REF(auth_basic)]), @@ -158,30 +152,36 @@ desc(auth_basic) -> "Basic Authentication"; desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Elastic Search using `", string:to_upper(Method), "` method."]; +desc("server") -> + ?DESC("server"); desc(_) -> undefined. +server() -> + Meta = #{ + required => true, + default => <<"127.0.0.1:9200">>, + desc => ?DESC("server"), + converter => fun ?MODULE:convert_server/2 + }, + emqx_schema:servers_sc(Meta, #{default_port => 9200}). + +convert_server(<<"http://", Server/binary>>, HoconOpts) -> + convert_server(Server, HoconOpts); +convert_server(<<"https://", Server/binary>>, HoconOpts) -> + convert_server(Server, HoconOpts); +convert_server(Server0, HoconOpts) -> + Server = string:trim(Server0, trailing, "/"), + emqx_schema:convert_servers(Server, HoconOpts). + connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) -> - #{ - base_url := BaseUrl, - authentication := - #{ - username := Username, - password := Password0 - } - } = Conf, - - Password = emqx_secret:unwrap(Password0), - Base64 = base64:encode(<>), - BasicToken = <<"Basic ", Base64/binary>>, - WebhookConfig = Conf#{ method => <<"post">>, - url => BaseUrl, + url => base_url(Conf), headers => [ {<<"Content-type">>, <<"application/json">>}, - {<<"Authorization">>, BasicToken} + {<<"Authorization">>, basic_token(Conf)} ] }, ParseConfs( @@ -190,6 +190,19 @@ connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) -> WebhookConfig ). +basic_token(#{ + authentication := + #{ + username := Username, + password := Password0 + } +}) -> + Password = emqx_secret:unwrap(Password0), + Base64 = base64:encode(<>), + <<"Basic ", Base64/binary>>. + +base_url(#{ssl := #{enable := true}, server := Server}) -> "https://" ++ Server; +base_url(#{server := Server}) -> "http://" ++ Server. %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -316,6 +329,10 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> {error, not_exists} end. +render_template([<<"update_without_doc_template">>], Msg) -> + emqx_utils_json:encode(#{<<"doc">> => Msg}); +render_template([<<"create_without_doc_template">>], Msg) -> + emqx_utils_json:encode(#{<<"doc">> => Msg, <<"doc_as_upsert">> => true}); render_template(Template, Msg) -> % Ignoring errors here, undefined bindings will be replaced with empty string. Opts = #{var_trans => fun to_string/2}, @@ -395,6 +412,11 @@ get_body_template(#{action := update, doc := Doc} = Template) -> false -> <<"{\"doc\":", Doc/binary, "}">>; true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">> end; +get_body_template(#{action := update} = Template) -> + case maps:get(doc_as_upsert, Template, false) of + false -> <<"update_without_doc_template">>; + true -> <<"create_without_doc_template">> + end; get_body_template(#{doc := Doc}) -> Doc; get_body_template(_) -> diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl index a9ff70957..530eb77b2 100644 --- a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl @@ -103,13 +103,13 @@ end_per_testcase(_TestCase, _Config) -> %% Helper fns %%------------------------------------------------------------------------------------- -check_send_message_with_action(Topic, ActionName, ConnectorName) -> +check_send_message_with_action(Topic, ActionName, ConnectorName, Expect) -> send_message(Topic), %% ###################################### %% Check if message is sent to es %% ###################################### timer:sleep(500), - check_action_metrics(ActionName, ConnectorName). + check_action_metrics(ActionName, ConnectorName, Expect). send_message(Topic) -> Now = emqx_utils_calendar:now_to_rfc3339(microsecond), @@ -123,7 +123,7 @@ send_message(Topic) -> ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]), ok. -check_action_metrics(ActionName, ConnectorName) -> +check_action_metrics(ActionName, ConnectorName, Expect) -> ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), Metrics = #{ @@ -134,13 +134,7 @@ check_action_metrics(ActionName, ConnectorName) -> dropped => emqx_resource_metrics:dropped_get(ActionId) }, ?assertEqual( - #{ - match => 1, - success => 1, - dropped => 0, - failed => 0, - queuing => 0 - }, + Expect, Metrics, {ActionName, ConnectorName, ActionId} ). @@ -169,11 +163,10 @@ action(ConnectorName) -> } }. -base_url(Config) -> +server(Config) -> Host = ?config(es_host, Config), Port = ?config(es_port, Config), iolist_to_binary([ - "https://", Host, ":", integer_to_binary(Port) @@ -185,7 +178,7 @@ connector_config(Config) -> connector_config(Overrides, Config) -> Defaults = #{ - <<"base_url">> => base_url(Config), + <<"server">> => server(Config), <<"enable">> => true, <<"authentication">> => #{ <<"password">> => <<"emqx123">>, @@ -249,7 +242,7 @@ t_create_remove_list(Config) -> ok. %% Test sending a message to a bridge V2 -t_send_message(Config) -> +t_create_message(Config) -> ConnectorConfig = connector_config(Config), {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig), ActionConfig = action(<<"test_connector2">>), @@ -262,7 +255,8 @@ t_send_message(Config) -> }, {ok, _} = emqx_rule_engine:create_rule(Rule), %% Use the action to send a message - check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2), + Expect = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0}, + check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2, Expect), %% Create a few more bridges with the same connector and test them ActionNames1 = lists:foldl( @@ -279,7 +273,7 @@ t_send_message(Config) -> }, {ok, _} = emqx_rule_engine:create_rule(Rule1), Topic = <<"es/", Seq/binary>>, - check_send_message_with_action(Topic, ActionName, test_connector2), + check_send_message_with_action(Topic, ActionName, test_connector2, Expect), [ActionName | Acc] end, [], @@ -294,6 +288,74 @@ t_send_message(Config) -> ActionNames ), emqx_connector:remove(?TYPE, test_connector2), + lists:foreach( + fun(#{id := Id}) -> + emqx_rule_engine:delete_rule(Id) + end, + emqx_rule_engine:get_rules() + ), + ok. + +t_update_message(Config) -> + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, update_connector, ConnectorConfig), + ActionConfig0 = action(<<"update_connector">>), + DocId = emqx_guid:to_hexstr(emqx_guid:gen()), + ActionConfig1 = ActionConfig0#{ + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"id">> => DocId, + <<"max_retries">> => 0, + <<"action">> => <<"update">>, + <<"doc">> => <<"${payload.doc}">> + } + }, + {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig1), + Rule = #{ + id => <<"rule:t_es_1">>, + sql => <<"SELECT\n *\nFROM\n \"es/#\"">>, + actions => [<<"elasticsearch:update_action">>], + description => <<"sink doc to elasticsearch">> + }, + {ok, _} = emqx_rule_engine:create_rule(Rule), + %% failed to update a nonexistent doc + Expect0 = #{match => 1, success => 0, dropped => 0, failed => 1, queuing => 0}, + check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect0), + %% doc_as_upsert to insert a new doc + ActionConfig2 = ActionConfig1#{ + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"id">> => DocId, + <<"action">> => <<"update">>, + <<"doc">> => <<"${payload.doc}">>, + <<"doc_as_upsert">> => true, + <<"max_retries">> => 0 + } + }, + {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig2), + Expect1 = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0}, + check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect1), + %% update without doc, use msg as default + ActionConfig3 = ActionConfig1#{ + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"id">> => DocId, + <<"action">> => <<"update">>, + <<"max_retries">> => 0 + } + }, + {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig3), + Expect2 = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0}, + check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect2), + %% Clean + ok = emqx_bridge_v2:remove(?TYPE, update_action), + emqx_connector:remove(?TYPE, update_connector), + lists:foreach( + fun(#{id := Id}) -> + emqx_rule_engine:delete_rule(Id) + end, + emqx_rule_engine:get_rules() + ), ok. %% Test that we can get the status of the bridge V2 @@ -314,7 +376,7 @@ t_bad_url(Config) -> ActionName = <<"test_action">>, ActionConfig = action(<<"test_connector">>), ConnectorConfig0 = connector_config(Config), - ConnectorConfig = ConnectorConfig0#{<<"base_url">> := <<"bad_host:9092">>}, + ConnectorConfig = ConnectorConfig0#{<<"server">> := <<"bad_host:9092">>}, ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), ?assertMatch( diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 478486e5b..809f47e49 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -865,7 +865,8 @@ convert_server(<<"http://", Server/binary>>, HoconOpts) -> convert_server(Server, HoconOpts); convert_server(<<"https://", Server/binary>>, HoconOpts) -> convert_server(Server, HoconOpts); -convert_server(Server, HoconOpts) -> +convert_server(Server0, HoconOpts) -> + Server = string:trim(Server0, trailing, "/"), emqx_schema:convert_servers(Server, HoconOpts). str(A) when is_atom(A) -> diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index 8ad11f05b..1cff9dbb9 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -35,11 +35,6 @@ config_auth_basic_password.desc: config_auth_basic_password.label: """HTTP Basic Auth Password""" -config_base_url.desc: -"""The base URL of the external ElasticSearch service's REST interface.""" -config_base_url.label: -"""ElasticSearch REST Service Base URL""" - config_target.desc: """Name of the data stream, index, or index alias to perform bulk actions on""" diff --git a/rel/i18n/emqx_bridge_es_connector.hocon b/rel/i18n/emqx_bridge_es_connector.hocon index ddd53e0fc..21406b840 100644 --- a/rel/i18n/emqx_bridge_es_connector.hocon +++ b/rel/i18n/emqx_bridge_es_connector.hocon @@ -1,5 +1,13 @@ emqx_bridge_es_connector { +server.desc: +"""The IPv4 or IPv6 address or the hostname to connect to. +A host entry has the following form: `Host[:Port]`. +The Elasticsearch default port 9200 is used if `[:Port]` is not specified.""" + +server.label: +"""Server Host""" + config_authentication.desc: """Authentication configuration"""