244 lines
6.3 KiB
Erlang
244 lines
6.3 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_bridge_es).
|
|
|
|
-include("emqx_bridge_es.hrl").
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
|
|
|
-export([bridge_v2_examples/1]).
|
|
|
|
%% hocon_schema API
|
|
-export([namespace/0, roots/0, fields/1, desc/1]).
|
|
|
|
-define(CONNECTOR_TYPE, elasticsearch).
|
|
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
|
|
|
|
namespace() -> "bridge_elasticsearch".
|
|
|
|
roots() -> [].
|
|
|
|
fields(action) ->
|
|
{elasticsearch,
|
|
?HOCON(
|
|
?MAP(action_name, ?R_REF(action_config)),
|
|
#{
|
|
required => false,
|
|
desc => ?DESC(elasticsearch)
|
|
}
|
|
)};
|
|
fields(action_config) ->
|
|
emqx_resource_schema:override(
|
|
emqx_bridge_v2_schema:make_consumer_action_schema(
|
|
?HOCON(
|
|
?UNION(fun action_union_member_selector/1),
|
|
#{
|
|
required => true, desc => ?DESC("action_parameters")
|
|
}
|
|
)
|
|
),
|
|
[
|
|
{resource_opts,
|
|
?HOCON(?R_REF(action_resource_opts), #{
|
|
default => #{},
|
|
desc => ?DESC(emqx_resource_schema, "resource_opts")
|
|
})}
|
|
]
|
|
);
|
|
fields(action_resource_opts) ->
|
|
lists:filter(
|
|
fun({K, _V}) ->
|
|
not lists:member(K, unsupported_opts())
|
|
end,
|
|
emqx_bridge_v2_schema:action_resource_opts_fields()
|
|
);
|
|
fields(action_create) ->
|
|
[
|
|
action(create),
|
|
index(),
|
|
id(false),
|
|
doc(),
|
|
routing(),
|
|
require_alias(),
|
|
overwrite()
|
|
| http_common_opts()
|
|
];
|
|
fields(action_delete) ->
|
|
[action(delete), index(), id(true), routing() | http_common_opts()];
|
|
fields(action_update) ->
|
|
[
|
|
action(update),
|
|
index(),
|
|
id(true),
|
|
doc(),
|
|
doc_as_upsert(),
|
|
routing(),
|
|
require_alias()
|
|
| http_common_opts()
|
|
];
|
|
fields("post_bridge_v2") ->
|
|
emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config);
|
|
fields("put_bridge_v2") ->
|
|
fields(action_config);
|
|
fields("get_bridge_v2") ->
|
|
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
|
|
|
|
action_union_member_selector(all_union_members) ->
|
|
[
|
|
?R_REF(action_create),
|
|
?R_REF(action_delete),
|
|
?R_REF(action_update)
|
|
];
|
|
action_union_member_selector({value, Value}) ->
|
|
case Value of
|
|
#{<<"action">> := <<"create">>} ->
|
|
[?R_REF(action_create)];
|
|
#{<<"action">> := <<"delete">>} ->
|
|
[?R_REF(action_delete)];
|
|
#{<<"action">> := <<"update">>} ->
|
|
[?R_REF(action_update)];
|
|
#{<<"action">> := Action} when is_atom(Action) ->
|
|
Value1 = Value#{<<"action">> => atom_to_binary(Action)},
|
|
action_union_member_selector({value, Value1});
|
|
Actual ->
|
|
Expected = "create | delete | update",
|
|
throw(#{
|
|
field_name => action,
|
|
actual => Actual,
|
|
expected => Expected
|
|
})
|
|
end.
|
|
|
|
action(Action) ->
|
|
{action,
|
|
?HOCON(
|
|
Action,
|
|
#{
|
|
required => true,
|
|
desc => atom_to_binary(Action)
|
|
}
|
|
)}.
|
|
|
|
overwrite() ->
|
|
{overwrite,
|
|
?HOCON(
|
|
boolean(),
|
|
#{
|
|
required => false,
|
|
default => true,
|
|
desc => ?DESC("config_overwrite")
|
|
}
|
|
)}.
|
|
|
|
index() ->
|
|
{index,
|
|
?HOCON(
|
|
emqx_schema:template(),
|
|
#{
|
|
required => true,
|
|
example => <<"${payload.index}">>,
|
|
desc => ?DESC("config_parameters_index")
|
|
}
|
|
)}.
|
|
|
|
id(Required) ->
|
|
{id,
|
|
?HOCON(
|
|
emqx_schema:template(),
|
|
#{
|
|
required => Required,
|
|
example => <<"${payload.id}">>,
|
|
desc => ?DESC("config_parameters_id")
|
|
}
|
|
)}.
|
|
|
|
doc() ->
|
|
{doc,
|
|
?HOCON(
|
|
emqx_schema:template(),
|
|
#{
|
|
required => false,
|
|
example => <<"${payload.doc}">>,
|
|
desc => ?DESC("config_parameters_doc")
|
|
}
|
|
)}.
|
|
|
|
http_common_opts() ->
|
|
lists:filter(
|
|
fun({K, _}) ->
|
|
not lists:member(K, [path, method, body, headers, request_timeout])
|
|
end,
|
|
emqx_bridge_http_schema:fields("parameters_opts")
|
|
).
|
|
|
|
doc_as_upsert() ->
|
|
{doc_as_upsert,
|
|
?HOCON(
|
|
boolean(),
|
|
#{
|
|
required => false,
|
|
default => false,
|
|
desc => ?DESC("config_doc_as_upsert")
|
|
}
|
|
)}.
|
|
|
|
routing() ->
|
|
{routing,
|
|
?HOCON(
|
|
emqx_schema:template(),
|
|
#{
|
|
required => false,
|
|
example => <<"${payload.routing}">>,
|
|
desc => ?DESC("config_routing")
|
|
}
|
|
)}.
|
|
|
|
require_alias() ->
|
|
{require_alias,
|
|
?HOCON(
|
|
boolean(),
|
|
#{
|
|
required => false,
|
|
desc => ?DESC("config_require_alias")
|
|
}
|
|
)}.
|
|
|
|
bridge_v2_examples(Method) ->
|
|
[
|
|
#{
|
|
<<"elasticsearch">> =>
|
|
#{
|
|
summary => <<"Elastic Search Bridge">>,
|
|
value => emqx_bridge_v2_schema:action_values(
|
|
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
|
|
)
|
|
}
|
|
}
|
|
].
|
|
|
|
action_values() ->
|
|
#{
|
|
parameters => #{
|
|
action => create,
|
|
index => <<"${payload.index}">>,
|
|
overwrite => true,
|
|
doc => <<"${payload.doc}">>
|
|
}
|
|
}.
|
|
|
|
unsupported_opts() ->
|
|
[
|
|
batch_size,
|
|
batch_time
|
|
].
|
|
|
|
desc(elasticsearch) -> ?DESC(elasticsearch);
|
|
desc(action_config) -> ?DESC(action_config);
|
|
desc(action_create) -> ?DESC(action_create);
|
|
desc(action_delete) -> ?DESC(action_delete);
|
|
desc(action_update) -> ?DESC(action_update);
|
|
desc(action_resource_opts) -> ?DESC(action_resource_opts);
|
|
desc(_) -> undefined.
|