emqx/apps/emqx_bridge_es/src/emqx_bridge_es.erl

244 lines
6.3 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_es).
-include("emqx_bridge_es.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([bridge_v2_examples/1]).
%% hocon_schema API
-export([namespace/0, roots/0, fields/1, desc/1]).
-define(CONNECTOR_TYPE, elasticsearch).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
namespace() -> "bridge_elasticsearch".
roots() -> [].
fields(action) ->
{elasticsearch,
?HOCON(
?MAP(action_name, ?R_REF(action_config)),
#{
required => false,
desc => ?DESC(elasticsearch)
}
)};
fields(action_config) ->
emqx_resource_schema:override(
emqx_bridge_v2_schema:make_consumer_action_schema(
?HOCON(
?UNION(fun action_union_member_selector/1),
#{
required => true, desc => ?DESC("action_parameters")
}
)
),
[
{resource_opts,
?HOCON(?R_REF(action_resource_opts), #{
default => #{},
desc => ?DESC(emqx_resource_schema, "resource_opts")
})}
]
);
fields(action_resource_opts) ->
lists:filter(
fun({K, _V}) ->
not lists:member(K, unsupported_opts())
end,
emqx_bridge_v2_schema:action_resource_opts_fields()
);
fields(action_create) ->
[
action(create),
index(),
id(false),
doc(),
routing(),
require_alias(),
overwrite()
| http_common_opts()
];
fields(action_delete) ->
[action(delete), index(), id(true), routing() | http_common_opts()];
fields(action_update) ->
[
action(update),
index(),
id(true),
doc(),
doc_as_upsert(),
routing(),
require_alias()
| http_common_opts()
];
fields("post_bridge_v2") ->
emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config);
fields("put_bridge_v2") ->
fields(action_config);
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
action_union_member_selector(all_union_members) ->
[
?R_REF(action_create),
?R_REF(action_delete),
?R_REF(action_update)
];
action_union_member_selector({value, Value}) ->
case Value of
#{<<"action">> := <<"create">>} ->
[?R_REF(action_create)];
#{<<"action">> := <<"delete">>} ->
[?R_REF(action_delete)];
#{<<"action">> := <<"update">>} ->
[?R_REF(action_update)];
#{<<"action">> := Action} when is_atom(Action) ->
Value1 = Value#{<<"action">> => atom_to_binary(Action)},
action_union_member_selector({value, Value1});
Actual ->
Expected = "create | delete | update",
throw(#{
field_name => action,
actual => Actual,
expected => Expected
})
end.
action(Action) ->
{action,
?HOCON(
Action,
#{
required => true,
desc => atom_to_binary(Action)
}
)}.
overwrite() ->
{overwrite,
?HOCON(
boolean(),
#{
required => false,
default => true,
desc => ?DESC("config_overwrite")
}
)}.
index() ->
{index,
?HOCON(
emqx_schema:template(),
#{
required => true,
example => <<"${payload.index}">>,
desc => ?DESC("config_parameters_index")
}
)}.
id(Required) ->
{id,
?HOCON(
emqx_schema:template(),
#{
required => Required,
example => <<"${payload.id}">>,
desc => ?DESC("config_parameters_id")
}
)}.
doc() ->
{doc,
?HOCON(
emqx_schema:template(),
#{
required => false,
example => <<"${payload.doc}">>,
desc => ?DESC("config_parameters_doc")
}
)}.
http_common_opts() ->
lists:filter(
fun({K, _}) ->
not lists:member(K, [path, method, body, headers, request_timeout])
end,
emqx_bridge_http_schema:fields("parameters_opts")
).
doc_as_upsert() ->
{doc_as_upsert,
?HOCON(
boolean(),
#{
required => false,
default => false,
desc => ?DESC("config_doc_as_upsert")
}
)}.
routing() ->
{routing,
?HOCON(
emqx_schema:template(),
#{
required => false,
example => <<"${payload.routing}">>,
desc => ?DESC("config_routing")
}
)}.
require_alias() ->
{require_alias,
?HOCON(
boolean(),
#{
required => false,
desc => ?DESC("config_require_alias")
}
)}.
bridge_v2_examples(Method) ->
[
#{
<<"elasticsearch">> =>
#{
summary => <<"Elastic Search Bridge">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
parameters => #{
action => create,
index => <<"${payload.index}">>,
overwrite => true,
doc => <<"${payload.doc}">>
}
}.
unsupported_opts() ->
[
batch_size,
batch_time
].
desc(elasticsearch) -> ?DESC(elasticsearch);
desc(action_config) -> ?DESC(action_config);
desc(action_create) -> ?DESC(action_create);
desc(action_delete) -> ?DESC(action_delete);
desc(action_update) -> ?DESC(action_update);
desc(action_resource_opts) -> ?DESC(action_resource_opts);
desc(_) -> undefined.