chore: es's base_url to server
This commit is contained in:
parent
2706e005ff
commit
ada2785b5d
|
@ -34,6 +34,7 @@
|
|||
]).
|
||||
|
||||
-export([render_template/2]).
|
||||
-export([convert_server/2]).
|
||||
|
||||
%% emqx_connector_resource behaviour callbacks
|
||||
-export([connector_config/2]).
|
||||
|
@ -92,7 +93,7 @@ connector_example_values() ->
|
|||
<<"username">> => <<"root">>,
|
||||
<<"password">> => <<"******">>
|
||||
},
|
||||
base_url => <<"http://127.0.0.1:9200/">>,
|
||||
server => <<"127.0.0.1:9200">>,
|
||||
connect_timeout => <<"15s">>,
|
||||
pool_type => <<"random">>,
|
||||
pool_size => 8,
|
||||
|
@ -116,14 +117,7 @@ fields(config) ->
|
|||
fields("connection_fields");
|
||||
fields("connection_fields") ->
|
||||
[
|
||||
{base_url,
|
||||
?HOCON(
|
||||
emqx_schema:url(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(emqx_bridge_es, "config_base_url")
|
||||
}
|
||||
)},
|
||||
{server, server()},
|
||||
{authentication,
|
||||
?HOCON(
|
||||
?UNION([?R_REF(auth_basic)]),
|
||||
|
@ -158,30 +152,36 @@ desc(auth_basic) ->
|
|||
"Basic Authentication";
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for Elastic Search using `", string:to_upper(Method), "` method."];
|
||||
desc("server") ->
|
||||
?DESC("server");
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
server() ->
|
||||
Meta = #{
|
||||
required => true,
|
||||
default => <<"127.0.0.1:9200">>,
|
||||
desc => ?DESC("server"),
|
||||
converter => fun ?MODULE:convert_server/2
|
||||
},
|
||||
emqx_schema:servers_sc(Meta, #{default_port => 9200}).
|
||||
|
||||
convert_server(<<"http://", Server/binary>>, HoconOpts) ->
|
||||
convert_server(Server, HoconOpts);
|
||||
convert_server(<<"https://", Server/binary>>, HoconOpts) ->
|
||||
convert_server(Server, HoconOpts);
|
||||
convert_server(Server0, HoconOpts) ->
|
||||
Server = string:trim(Server0, trailing, "/"),
|
||||
emqx_schema:convert_servers(Server, HoconOpts).
|
||||
|
||||
connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
|
||||
#{
|
||||
base_url := BaseUrl,
|
||||
authentication :=
|
||||
#{
|
||||
username := Username,
|
||||
password := Password0
|
||||
}
|
||||
} = Conf,
|
||||
|
||||
Password = emqx_secret:unwrap(Password0),
|
||||
Base64 = base64:encode(<<Username/binary, ":", Password/binary>>),
|
||||
BasicToken = <<"Basic ", Base64/binary>>,
|
||||
|
||||
WebhookConfig =
|
||||
Conf#{
|
||||
method => <<"post">>,
|
||||
url => BaseUrl,
|
||||
url => base_url(Conf),
|
||||
headers => [
|
||||
{<<"Content-type">>, <<"application/json">>},
|
||||
{<<"Authorization">>, BasicToken}
|
||||
{<<"Authorization">>, basic_token(Conf)}
|
||||
]
|
||||
},
|
||||
ParseConfs(
|
||||
|
@ -190,6 +190,19 @@ connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
|
|||
WebhookConfig
|
||||
).
|
||||
|
||||
basic_token(#{
|
||||
authentication :=
|
||||
#{
|
||||
username := Username,
|
||||
password := Password0
|
||||
}
|
||||
}) ->
|
||||
Password = emqx_secret:unwrap(Password0),
|
||||
Base64 = base64:encode(<<Username/binary, ":", Password/binary>>),
|
||||
<<"Basic ", Base64/binary>>.
|
||||
|
||||
base_url(#{ssl := #{enable := true}, server := Server}) -> "https://" ++ Server;
|
||||
base_url(#{server := Server}) -> "http://" ++ Server.
|
||||
%%-------------------------------------------------------------------------------------
|
||||
%% `emqx_resource' API
|
||||
%%-------------------------------------------------------------------------------------
|
||||
|
@ -316,6 +329,10 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
|
|||
{error, not_exists}
|
||||
end.
|
||||
|
||||
render_template([<<"update_without_doc_template">>], Msg) ->
|
||||
emqx_utils_json:encode(#{<<"doc">> => Msg});
|
||||
render_template([<<"create_without_doc_template">>], Msg) ->
|
||||
emqx_utils_json:encode(#{<<"doc">> => Msg, <<"doc_as_upsert">> => true});
|
||||
render_template(Template, Msg) ->
|
||||
% Ignoring errors here, undefined bindings will be replaced with empty string.
|
||||
Opts = #{var_trans => fun to_string/2},
|
||||
|
@ -395,6 +412,11 @@ get_body_template(#{action := update, doc := Doc} = Template) ->
|
|||
false -> <<"{\"doc\":", Doc/binary, "}">>;
|
||||
true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">>
|
||||
end;
|
||||
get_body_template(#{action := update} = Template) ->
|
||||
case maps:get(doc_as_upsert, Template, false) of
|
||||
false -> <<"update_without_doc_template">>;
|
||||
true -> <<"create_without_doc_template">>
|
||||
end;
|
||||
get_body_template(#{doc := Doc}) ->
|
||||
Doc;
|
||||
get_body_template(_) ->
|
||||
|
|
|
@ -169,11 +169,10 @@ action(ConnectorName) ->
|
|||
}
|
||||
}.
|
||||
|
||||
base_url(Config) ->
|
||||
server(Config) ->
|
||||
Host = ?config(es_host, Config),
|
||||
Port = ?config(es_port, Config),
|
||||
iolist_to_binary([
|
||||
"https://",
|
||||
Host,
|
||||
":",
|
||||
integer_to_binary(Port)
|
||||
|
@ -185,7 +184,7 @@ connector_config(Config) ->
|
|||
connector_config(Overrides, Config) ->
|
||||
Defaults =
|
||||
#{
|
||||
<<"base_url">> => base_url(Config),
|
||||
<<"server">> => server(Config),
|
||||
<<"enable">> => true,
|
||||
<<"authentication">> => #{
|
||||
<<"password">> => <<"emqx123">>,
|
||||
|
@ -314,7 +313,7 @@ t_bad_url(Config) ->
|
|||
ActionName = <<"test_action">>,
|
||||
ActionConfig = action(<<"test_connector">>),
|
||||
ConnectorConfig0 = connector_config(Config),
|
||||
ConnectorConfig = ConnectorConfig0#{<<"base_url">> := <<"bad_host:9092">>},
|
||||
ConnectorConfig = ConnectorConfig0#{<<"server">> := <<"bad_host:9092">>},
|
||||
?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
|
||||
?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
|
||||
?assertMatch(
|
||||
|
|
|
@ -865,7 +865,8 @@ convert_server(<<"http://", Server/binary>>, HoconOpts) ->
|
|||
convert_server(Server, HoconOpts);
|
||||
convert_server(<<"https://", Server/binary>>, HoconOpts) ->
|
||||
convert_server(Server, HoconOpts);
|
||||
convert_server(Server, HoconOpts) ->
|
||||
convert_server(Server0, HoconOpts) ->
|
||||
Server = string:trim(Server0, trailing, "/"),
|
||||
emqx_schema:convert_servers(Server, HoconOpts).
|
||||
|
||||
str(A) when is_atom(A) ->
|
||||
|
|
|
@ -35,11 +35,6 @@ config_auth_basic_password.desc:
|
|||
config_auth_basic_password.label:
|
||||
"""HTTP Basic Auth Password"""
|
||||
|
||||
config_base_url.desc:
|
||||
"""The base URL of the external ElasticSearch service's REST interface."""
|
||||
config_base_url.label:
|
||||
"""ElasticSearch REST Service Base URL"""
|
||||
|
||||
config_target.desc:
|
||||
"""Name of the data stream, index, or index alias to perform bulk actions on"""
|
||||
|
||||
|
|
|
@ -1,5 +1,13 @@
|
|||
emqx_bridge_es_connector {
|
||||
|
||||
server.desc:
|
||||
"""The IPv4 or IPv6 address or the hostname to connect to.
|
||||
A host entry has the following form: `Host[:Port]`.
|
||||
The Elasticsearch default port 9200 is used if `[:Port]` is not specified."""
|
||||
|
||||
server.label:
|
||||
"""Server Host"""
|
||||
|
||||
config_authentication.desc:
|
||||
"""Authentication configuration"""
|
||||
|
||||
|
|
Loading…
Reference in New Issue