Merge pull request #12371 from emqx/es-server-schema

chore: es's base_url to server
This commit is contained in:
zhongwencool 2024-01-23 17:17:19 +08:00 committed by GitHub
commit a8eefe808f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 135 additions and 47 deletions

View File

@ -34,6 +34,7 @@
]). ]).
-export([render_template/2]). -export([render_template/2]).
-export([convert_server/2]).
%% emqx_connector_resource behaviour callbacks %% emqx_connector_resource behaviour callbacks
-export([connector_config/2]). -export([connector_config/2]).
@ -92,7 +93,7 @@ connector_example_values() ->
<<"username">> => <<"root">>, <<"username">> => <<"root">>,
<<"password">> => <<"******">> <<"password">> => <<"******">>
}, },
base_url => <<"http://127.0.0.1:9200/">>, server => <<"127.0.0.1:9200">>,
connect_timeout => <<"15s">>, connect_timeout => <<"15s">>,
pool_type => <<"random">>, pool_type => <<"random">>,
pool_size => 8, pool_size => 8,
@ -116,14 +117,7 @@ fields(config) ->
fields("connection_fields"); fields("connection_fields");
fields("connection_fields") -> fields("connection_fields") ->
[ [
{base_url, {server, server()},
?HOCON(
emqx_schema:url(),
#{
required => true,
desc => ?DESC(emqx_bridge_es, "config_base_url")
}
)},
{authentication, {authentication,
?HOCON( ?HOCON(
?UNION([?R_REF(auth_basic)]), ?UNION([?R_REF(auth_basic)]),
@ -158,30 +152,36 @@ desc(auth_basic) ->
"Basic Authentication"; "Basic Authentication";
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Elastic Search using `", string:to_upper(Method), "` method."]; ["Configuration for Elastic Search using `", string:to_upper(Method), "` method."];
desc("server") ->
?DESC("server");
desc(_) -> desc(_) ->
undefined. undefined.
server() ->
Meta = #{
required => true,
default => <<"127.0.0.1:9200">>,
desc => ?DESC("server"),
converter => fun ?MODULE:convert_server/2
},
emqx_schema:servers_sc(Meta, #{default_port => 9200}).
convert_server(<<"http://", Server/binary>>, HoconOpts) ->
convert_server(Server, HoconOpts);
convert_server(<<"https://", Server/binary>>, HoconOpts) ->
convert_server(Server, HoconOpts);
convert_server(Server0, HoconOpts) ->
Server = string:trim(Server0, trailing, "/"),
emqx_schema:convert_servers(Server, HoconOpts).
connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) -> connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
#{
base_url := BaseUrl,
authentication :=
#{
username := Username,
password := Password0
}
} = Conf,
Password = emqx_secret:unwrap(Password0),
Base64 = base64:encode(<<Username/binary, ":", Password/binary>>),
BasicToken = <<"Basic ", Base64/binary>>,
WebhookConfig = WebhookConfig =
Conf#{ Conf#{
method => <<"post">>, method => <<"post">>,
url => BaseUrl, url => base_url(Conf),
headers => [ headers => [
{<<"Content-type">>, <<"application/json">>}, {<<"Content-type">>, <<"application/json">>},
{<<"Authorization">>, BasicToken} {<<"Authorization">>, basic_token(Conf)}
] ]
}, },
ParseConfs( ParseConfs(
@ -190,6 +190,19 @@ connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
WebhookConfig WebhookConfig
). ).
basic_token(#{
authentication :=
#{
username := Username,
password := Password0
}
}) ->
Password = emqx_secret:unwrap(Password0),
Base64 = base64:encode(<<Username/binary, ":", Password/binary>>),
<<"Basic ", Base64/binary>>.
base_url(#{ssl := #{enable := true}, server := Server}) -> "https://" ++ Server;
base_url(#{server := Server}) -> "http://" ++ Server.
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
@ -316,6 +329,10 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
{error, not_exists} {error, not_exists}
end. end.
render_template([<<"update_without_doc_template">>], Msg) ->
emqx_utils_json:encode(#{<<"doc">> => Msg});
render_template([<<"create_without_doc_template">>], Msg) ->
emqx_utils_json:encode(#{<<"doc">> => Msg, <<"doc_as_upsert">> => true});
render_template(Template, Msg) -> render_template(Template, Msg) ->
% Ignoring errors here, undefined bindings will be replaced with empty string. % Ignoring errors here, undefined bindings will be replaced with empty string.
Opts = #{var_trans => fun to_string/2}, Opts = #{var_trans => fun to_string/2},
@ -395,6 +412,11 @@ get_body_template(#{action := update, doc := Doc} = Template) ->
false -> <<"{\"doc\":", Doc/binary, "}">>; false -> <<"{\"doc\":", Doc/binary, "}">>;
true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">> true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">>
end; end;
get_body_template(#{action := update} = Template) ->
case maps:get(doc_as_upsert, Template, false) of
false -> <<"update_without_doc_template">>;
true -> <<"create_without_doc_template">>
end;
get_body_template(#{doc := Doc}) -> get_body_template(#{doc := Doc}) ->
Doc; Doc;
get_body_template(_) -> get_body_template(_) ->

View File

@ -103,13 +103,13 @@ end_per_testcase(_TestCase, _Config) ->
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
check_send_message_with_action(Topic, ActionName, ConnectorName) -> check_send_message_with_action(Topic, ActionName, ConnectorName, Expect) ->
send_message(Topic), send_message(Topic),
%% ###################################### %% ######################################
%% Check if message is sent to es %% Check if message is sent to es
%% ###################################### %% ######################################
timer:sleep(500), timer:sleep(500),
check_action_metrics(ActionName, ConnectorName). check_action_metrics(ActionName, ConnectorName, Expect).
send_message(Topic) -> send_message(Topic) ->
Now = emqx_utils_calendar:now_to_rfc3339(microsecond), Now = emqx_utils_calendar:now_to_rfc3339(microsecond),
@ -123,7 +123,7 @@ send_message(Topic) ->
ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]), ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]),
ok. ok.
check_action_metrics(ActionName, ConnectorName) -> check_action_metrics(ActionName, ConnectorName, Expect) ->
ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName),
Metrics = Metrics =
#{ #{
@ -134,13 +134,7 @@ check_action_metrics(ActionName, ConnectorName) ->
dropped => emqx_resource_metrics:dropped_get(ActionId) dropped => emqx_resource_metrics:dropped_get(ActionId)
}, },
?assertEqual( ?assertEqual(
#{ Expect,
match => 1,
success => 1,
dropped => 0,
failed => 0,
queuing => 0
},
Metrics, Metrics,
{ActionName, ConnectorName, ActionId} {ActionName, ConnectorName, ActionId}
). ).
@ -169,11 +163,10 @@ action(ConnectorName) ->
} }
}. }.
base_url(Config) -> server(Config) ->
Host = ?config(es_host, Config), Host = ?config(es_host, Config),
Port = ?config(es_port, Config), Port = ?config(es_port, Config),
iolist_to_binary([ iolist_to_binary([
"https://",
Host, Host,
":", ":",
integer_to_binary(Port) integer_to_binary(Port)
@ -185,7 +178,7 @@ connector_config(Config) ->
connector_config(Overrides, Config) -> connector_config(Overrides, Config) ->
Defaults = Defaults =
#{ #{
<<"base_url">> => base_url(Config), <<"server">> => server(Config),
<<"enable">> => true, <<"enable">> => true,
<<"authentication">> => #{ <<"authentication">> => #{
<<"password">> => <<"emqx123">>, <<"password">> => <<"emqx123">>,
@ -249,7 +242,7 @@ t_create_remove_list(Config) ->
ok. ok.
%% Test sending a message to a bridge V2 %% Test sending a message to a bridge V2
t_send_message(Config) -> t_create_message(Config) ->
ConnectorConfig = connector_config(Config), ConnectorConfig = connector_config(Config),
{ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig), {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig),
ActionConfig = action(<<"test_connector2">>), ActionConfig = action(<<"test_connector2">>),
@ -262,7 +255,8 @@ t_send_message(Config) ->
}, },
{ok, _} = emqx_rule_engine:create_rule(Rule), {ok, _} = emqx_rule_engine:create_rule(Rule),
%% Use the action to send a message %% Use the action to send a message
check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2), Expect = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0},
check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2, Expect),
%% Create a few more bridges with the same connector and test them %% Create a few more bridges with the same connector and test them
ActionNames1 = ActionNames1 =
lists:foldl( lists:foldl(
@ -279,7 +273,7 @@ t_send_message(Config) ->
}, },
{ok, _} = emqx_rule_engine:create_rule(Rule1), {ok, _} = emqx_rule_engine:create_rule(Rule1),
Topic = <<"es/", Seq/binary>>, Topic = <<"es/", Seq/binary>>,
check_send_message_with_action(Topic, ActionName, test_connector2), check_send_message_with_action(Topic, ActionName, test_connector2, Expect),
[ActionName | Acc] [ActionName | Acc]
end, end,
[], [],
@ -294,6 +288,74 @@ t_send_message(Config) ->
ActionNames ActionNames
), ),
emqx_connector:remove(?TYPE, test_connector2), emqx_connector:remove(?TYPE, test_connector2),
lists:foreach(
fun(#{id := Id}) ->
emqx_rule_engine:delete_rule(Id)
end,
emqx_rule_engine:get_rules()
),
ok.
t_update_message(Config) ->
ConnectorConfig = connector_config(Config),
{ok, _} = emqx_connector:create(?TYPE, update_connector, ConnectorConfig),
ActionConfig0 = action(<<"update_connector">>),
DocId = emqx_guid:to_hexstr(emqx_guid:gen()),
ActionConfig1 = ActionConfig0#{
<<"parameters">> => #{
<<"index">> => <<"${payload.index}">>,
<<"id">> => DocId,
<<"max_retries">> => 0,
<<"action">> => <<"update">>,
<<"doc">> => <<"${payload.doc}">>
}
},
{ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig1),
Rule = #{
id => <<"rule:t_es_1">>,
sql => <<"SELECT\n *\nFROM\n \"es/#\"">>,
actions => [<<"elasticsearch:update_action">>],
description => <<"sink doc to elasticsearch">>
},
{ok, _} = emqx_rule_engine:create_rule(Rule),
%% failed to update a nonexistent doc
Expect0 = #{match => 1, success => 0, dropped => 0, failed => 1, queuing => 0},
check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect0),
%% doc_as_upsert to insert a new doc
ActionConfig2 = ActionConfig1#{
<<"parameters">> => #{
<<"index">> => <<"${payload.index}">>,
<<"id">> => DocId,
<<"action">> => <<"update">>,
<<"doc">> => <<"${payload.doc}">>,
<<"doc_as_upsert">> => true,
<<"max_retries">> => 0
}
},
{ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig2),
Expect1 = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0},
check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect1),
%% update without doc, use msg as default
ActionConfig3 = ActionConfig1#{
<<"parameters">> => #{
<<"index">> => <<"${payload.index}">>,
<<"id">> => DocId,
<<"action">> => <<"update">>,
<<"max_retries">> => 0
}
},
{ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig3),
Expect2 = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0},
check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect2),
%% Clean
ok = emqx_bridge_v2:remove(?TYPE, update_action),
emqx_connector:remove(?TYPE, update_connector),
lists:foreach(
fun(#{id := Id}) ->
emqx_rule_engine:delete_rule(Id)
end,
emqx_rule_engine:get_rules()
),
ok. ok.
%% Test that we can get the status of the bridge V2 %% Test that we can get the status of the bridge V2
@ -314,7 +376,7 @@ t_bad_url(Config) ->
ActionName = <<"test_action">>, ActionName = <<"test_action">>,
ActionConfig = action(<<"test_connector">>), ActionConfig = action(<<"test_connector">>),
ConnectorConfig0 = connector_config(Config), ConnectorConfig0 = connector_config(Config),
ConnectorConfig = ConnectorConfig0#{<<"base_url">> := <<"bad_host:9092">>}, ConnectorConfig = ConnectorConfig0#{<<"server">> := <<"bad_host:9092">>},
?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
?assertMatch( ?assertMatch(

View File

@ -865,7 +865,8 @@ convert_server(<<"http://", Server/binary>>, HoconOpts) ->
convert_server(Server, HoconOpts); convert_server(Server, HoconOpts);
convert_server(<<"https://", Server/binary>>, HoconOpts) -> convert_server(<<"https://", Server/binary>>, HoconOpts) ->
convert_server(Server, HoconOpts); convert_server(Server, HoconOpts);
convert_server(Server, HoconOpts) -> convert_server(Server0, HoconOpts) ->
Server = string:trim(Server0, trailing, "/"),
emqx_schema:convert_servers(Server, HoconOpts). emqx_schema:convert_servers(Server, HoconOpts).
str(A) when is_atom(A) -> str(A) when is_atom(A) ->

View File

@ -35,11 +35,6 @@ config_auth_basic_password.desc:
config_auth_basic_password.label: config_auth_basic_password.label:
"""HTTP Basic Auth Password""" """HTTP Basic Auth Password"""
config_base_url.desc:
"""The base URL of the external ElasticSearch service's REST interface."""
config_base_url.label:
"""ElasticSearch REST Service Base URL"""
config_target.desc: config_target.desc:
"""Name of the data stream, index, or index alias to perform bulk actions on""" """Name of the data stream, index, or index alias to perform bulk actions on"""

View File

@ -1,5 +1,13 @@
emqx_bridge_es_connector { emqx_bridge_es_connector {
server.desc:
"""The IPv4 or IPv6 address or the hostname to connect to.
A host entry has the following form: `Host[:Port]`.
The Elasticsearch default port 9200 is used if `[:Port]` is not specified."""
server.label:
"""Server Host"""
config_authentication.desc: config_authentication.desc:
"""Authentication configuration""" """Authentication configuration"""