refactor: refactor es's action

This commit is contained in:
zhongwencool 2024-01-12 14:15:30 +08:00
parent ef8e056b3b
commit 366eda338d
4 changed files with 219 additions and 428 deletions

View File

@ -31,9 +31,9 @@ fields(action) ->
)};
fields(action_config) ->
emqx_resource_schema:override(
emqx_bridge_v2_schema:make_producer_action_schema(
emqx_bridge_v2_schema:make_consumer_action_schema(
?HOCON(
?R_REF(action_parameters),
?UNION(fun action_union_member_selector/1),
#{
required => true, desc => ?DESC("action_parameters")
}
@ -54,200 +54,28 @@ fields(action_resource_opts) ->
end,
emqx_bridge_v2_schema:resource_opts_fields()
);
fields(action_parameters) ->
fields(action_create) ->
[
{target,
?HOCON(
binary(),
#{
desc => ?DESC("config_target"),
required => false
}
)},
{require_alias,
?HOCON(
boolean(),
#{
required => false,
default => false,
desc => ?DESC("config_require_alias")
}
)},
{routing,
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_routing")
}
)},
{wait_for_active_shards,
?HOCON(
?UNION([pos_integer(), all]),
#{
required => false,
desc => ?DESC("config_wait_for_active_shards")
}
)},
{data,
?HOCON(
?ARRAY(
?UNION(
[
?R_REF(create),
?R_REF(delete),
?R_REF(index),
?R_REF(update)
]
)
),
#{
desc => ?DESC("action_parameters_data")
}
)}
] ++
lists:filter(
fun({K, _}) ->
not lists:member(K, [path, method, body, headers, request_timeout])
end,
emqx_bridge_http_schema:fields("parameters_opts")
);
fields(Action) when Action =:= create; Action =:= index ->
[
{action,
?HOCON(
Action,
#{
desc => atom_to_binary(Action),
required => true
}
)},
{'_index',
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_index")
}
)},
{'_id',
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_id")
}
)},
{require_alias,
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_require_alias")
}
)},
{fields,
?HOCON(
binary(),
#{
required => true,
desc => ?DESC("config_parameters_fields")
}
)}
action(create),
index(),
id(false),
doc(true),
routing(),
require_alias(),
overwrite()
| http_common_opts()
];
fields(delete) ->
fields(action_delete) ->
[action(delete), index(), id(true), routing() | http_common_opts()];
fields(action_update) ->
[
{action,
?HOCON(
delete,
#{
desc => <<"Delete">>,
required => true
}
)},
{'_index',
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_index")
}
)},
{'_id',
?HOCON(
binary(),
#{
required => true,
desc => ?DESC("config_parameters_id")
}
)},
{require_alias,
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_require_alias")
}
)}
];
fields(update) ->
[
{action,
?HOCON(
update,
#{
desc => <<"Update">>,
required => true
}
)},
{doc_as_upsert,
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_doc_as_upsert")
}
)},
{upsert,
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_upsert")
}
)},
{'_index',
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_index")
}
)},
{'_id',
?HOCON(
binary(),
#{
required => true,
desc => ?DESC("config_parameters_id")
}
)},
{require_alias,
?HOCON(
binary(),
#{
required => false,
desc => ?DESC("config_parameters_require_alias")
}
)},
{fields,
?HOCON(
binary(),
#{
required => true,
desc => ?DESC("config_parameters_fields")
}
)}
action(update),
index(),
id(true),
doc(true),
routing(),
require_alias()
| http_common_opts()
];
fields("post_bridge_v2") ->
emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config);
@ -256,6 +84,111 @@ fields("put_bridge_v2") ->
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
action_union_member_selector(all_union_members) ->
[
?R_REF(action_create),
?R_REF(action_delete),
?R_REF(action_update)
];
action_union_member_selector({value, Value}) ->
case Value of
#{<<"action">> := <<"create">>} ->
[?R_REF(action_create)];
#{<<"action">> := <<"delete">>} ->
[?R_REF(action_delete)];
#{<<"action">> := <<"update">>} ->
[?R_REF(action_update)];
_ ->
Expected = "create | delete | update",
throw(#{
field_name => action,
expected => Expected
})
end.
action(Action) ->
{action,
?HOCON(
Action,
#{
required => true,
desc => atom_to_binary(Action)
}
)}.
overwrite() ->
{overwrite,
?HOCON(
boolean(),
#{
required => false,
default => true,
desc => ?DESC("config_overwrite")
}
)}.
index() ->
{index,
?HOCON(
binary(),
#{
required => true,
example => <<"${payload.index}">>,
desc => ?DESC("config_parameters_target")
}
)}.
id(Required) ->
{'id',
?HOCON(
binary(),
#{
required => Required,
example => <<"${payload.id}">>,
desc => ?DESC("config_parameters_id")
}
)}.
doc(Required) ->
{'doc',
?HOCON(
binary(),
#{
required => Required,
example => <<"${payload.doc}">>,
desc => ?DESC("config_parameters_doc")
}
)}.
http_common_opts() ->
lists:filter(
fun({K, _}) ->
not lists:member(K, [path, method, body, headers, request_timeout])
end,
emqx_bridge_http_schema:fields("parameters_opts")
).
routing() ->
{routing,
?HOCON(
binary(),
#{
required => false,
example => <<"${payload.routing}">>,
desc => ?DESC("config_routing")
}
)}.
require_alias() ->
{require_alias,
?HOCON(
boolean(),
#{
required => false,
desc => ?DESC("config_require_alias")
}
)}.
bridge_v2_examples(Method) ->
[
#{
@ -272,34 +205,10 @@ bridge_v2_examples(Method) ->
action_values() ->
#{
parameters => #{
target => <<"${target_index}">>,
data => [
#{
action => index,
'_index' => <<"${index}">>,
fields => <<"${fields}">>,
require_alias => <<"${require_alias}">>
},
#{
action => create,
'_index' => <<"${index}">>,
fields => <<"${fields}">>
},
#{
action => delete,
'_index' => <<"${index}">>,
'_id' => <<"${id}">>
},
#{
action => update,
'_index' => <<"${index}">>,
'_id' => <<"${id}">>,
fields => <<"${fields}">>,
require_alias => false,
doc_as_upsert => <<"${doc_as_upsert}">>,
upsert => <<"${upsert}">>
}
]
action => create,
index => <<"${payload.index}">>,
overwrite => true,
doc => <<"${payload.doc}">>
}
}.

View File

@ -233,7 +233,7 @@ on_get_status(InstanceId, State) ->
{ok, pos_integer(), [term()], term()}
| {ok, pos_integer(), [term()]}
| {error, term()}.
on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) ->
on_query(InstanceId, {ChannelId, Msg} = Req, State) ->
?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}),
?SLOG(debug, #{
msg => "elasticsearch_bridge_on_query_called",
@ -241,21 +241,16 @@ on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) ->
send_message => Req,
state => emqx_utils:redact(State)
}),
case try_render_message(Req, Channels) of
{ok, Body} ->
handle_response(
emqx_bridge_http_connector:on_query(
InstanceId, {ChannelId, {Msg, Body}}, State
)
);
Error ->
Error
end.
handle_response(
emqx_bridge_http_connector:on_query(
InstanceId, {ChannelId, Msg}, State
)
).
-spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) ->
{ok, pid()} | {error, empty_request}.
on_query_async(
InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, #{channels := Channels} = State
InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, State
) ->
?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}),
?SLOG(debug, #{
@ -264,22 +259,17 @@ on_query_async(
send_message => Req,
state => emqx_utils:redact(State)
}),
case try_render_message(Req, Channels) of
{ok, Payload} ->
ReplyFunAndArgs =
{
fun(Result) ->
Response = handle_response(Result),
emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
end,
[]
},
emqx_bridge_http_connector:on_query_async(
InstanceId, {ChannelId, {Msg, Payload}}, ReplyFunAndArgs, State
);
Error ->
Error
end.
ReplyFunAndArgs =
{
fun(Result) ->
Response = handle_response(Result),
emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
end,
[]
},
emqx_bridge_http_connector:on_query_async(
InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State
).
on_add_channel(
InstanceId,
@ -291,19 +281,17 @@ on_add_channel(
true ->
{error, already_exists};
_ ->
#{data := Data} = Parameter,
Parameter1 = Parameter#{path => path(Parameter), method => <<"post">>},
Parameter1 = Parameter#{
path => path(Parameter),
method => method(Parameter),
body => get_body_template(Parameter)
},
{ok, State} = emqx_bridge_http_connector:on_add_channel(
InstanceId, State0, ChannelId, #{parameters => Parameter1}
),
case preproc_data_template(Data) of
[] ->
{error, invalid_data};
DataTemplate ->
Channel = Parameter1#{data => DataTemplate},
Channels2 = Channels#{ChannelId => Channel},
{ok, State#{channels => Channels2}}
end
Channel = Parameter1,
Channels2 = Channels#{ChannelId => Channel},
{ok, State#{channels => Channels2}}
end.
on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
@ -325,124 +313,59 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
path(Param) ->
Target = maps:get(target, Param, undefined),
QString0 = maps:fold(
fun(K, V, Acc) ->
[[atom_to_list(K), "=", to_str(V)] | Acc]
%% delete DELETE /<index>/_doc/<_id>
path(#{action := delete, id := Id, index := Index} = Action) ->
BasePath = ["/", Index, "/_doc/", Id],
Qs = add_query_string([routing], Action),
BasePath ++ Qs;
%% update POST /<index>/_update/<_id>
path(#{action := update, id := Id, index := Index} = Action) ->
BasePath = ["/", Index, "/_update/", Id],
Qs = add_query_string([routing, require_alias], Action),
BasePath ++ Qs;
%% create with id /<index>/_doc/_id
path(#{action := create, index := Index, id := Id} = Action) ->
BasePath = ["/", Index, "/_doc/", Id],
Qs =
case maps:get(overwrite, Action, true) of
true ->
add_query_string([routing, require_alias], Action);
false ->
Action1 = Action#{op_type => "create"},
add_query_string([routing, require_alias, op_type], Action1)
end,
[["_source=false"], ["filter_path=items.*.error"]],
maps:with([require_alias, routing, wait_for_active_shards], Param)
),
QString = "?" ++ lists:join("&", QString0),
target(Target) ++ QString.
BasePath ++ Qs;
%% create without id POST /<index>/_doc/
path(#{action := create, index := Index} = Action) ->
BasePath = ["/", Index, "/_doc/"],
Qs = add_query_string([routing, require_alias], Action),
BasePath ++ Qs.
target(undefined) -> "/_bulk";
target(Str) -> "/" ++ binary_to_list(Str) ++ "/_bulk".
method(#{action := create}) -> <<"POST">>;
method(#{action := delete}) -> <<"DELETE">>;
method(#{action := update}) -> <<"POST">>.
add_query_string(_Keys, Param) when map_size(Param) =:= 0 -> "";
add_query_string(Keys, Param) ->
QString =
maps:fold(
fun(K, V, Acc) ->
[[atom_to_list(K), "=", to_str(V)] | Acc]
end,
[],
maps:with(Keys, Param)
),
"?" ++ lists:join("&", QString).
to_str(List) when is_list(List) -> List;
to_str(false) -> "false";
to_str(true) -> "true";
to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom).
proc_data(DataList, Msg) when is_list(DataList) ->
[
begin
proc_data(Data, Msg)
end
|| Data <- DataList
];
proc_data(
#{
action := Action,
'_index' := IndexT,
'_id' := IdT,
require_alias := RequiredAliasT,
fields := FieldsT
},
Msg
) when Action =:= create; Action =:= index ->
[
emqx_utils_json:encode(
#{
Action => filter([
{'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
{'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
{required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}
])
}
),
"\n",
emqx_placeholder:proc_tmpl(FieldsT, Msg),
"\n"
];
proc_data(
#{
action := delete,
'_index' := IndexT,
'_id' := IdT,
require_alias := RequiredAliasT
},
Msg
) ->
[
emqx_utils_json:encode(
#{
delete => filter([
{'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
{'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
{required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}
])
}
),
"\n"
];
proc_data(
#{
action := update,
'_index' := IndexT,
'_id' := IdT,
require_alias := RequiredAliasT,
doc_as_upsert := DocAsUpsert,
upsert := Upsert,
fields := FieldsT
},
Msg
) ->
[
emqx_utils_json:encode(
#{
update => filter([
{'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
{'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
{required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)},
{doc_as_upsert, emqx_placeholder:proc_tmpl(DocAsUpsert, Msg)},
{upsert, emqx_placeholder:proc_tmpl(Upsert, Msg)}
])
}
),
"\n{\"doc\":",
emqx_placeholder:proc_tmpl(FieldsT, Msg),
"}\n"
].
filter(List) ->
Fun = fun
({_K, V}) when V =:= undefined; V =:= <<"undefined">>; V =:= "undefined" ->
false;
({_K, V}) when V =:= ""; V =:= <<>> ->
false;
({_K, V}) when V =:= "false" -> {true, false};
({_K, V}) when V =:= "true" -> {true, true};
({_K, _V}) ->
true
end,
maps:from_list(lists:filtermap(Fun, List)).
handle_response({ok, 200, _Headers, Body} = Resp) ->
eval_response_body(Body, Resp);
handle_response({ok, 200, Body} = Resp) ->
eval_response_body(Body, Resp);
handle_response({ok, Code, _Headers, _Body} = Resp) when Code =:= 200; Code =:= 201 ->
Resp;
handle_response({ok, Code, _Body} = Resp) when Code =:= 200; Code =:= 201 ->
Resp;
handle_response({ok, Code, _Headers, Body}) ->
{error, #{code => Code, body => Body}};
handle_response({ok, Code, Body}) ->
@ -450,49 +373,5 @@ handle_response({ok, Code, Body}) ->
handle_response({error, _} = Error) ->
Error.
eval_response_body(<<"{}">>, Resp) -> Resp;
eval_response_body(Body, _Resp) -> {error, emqx_utils_json:decode(Body)}.
preproc_data_template(DataList) when is_list(DataList) ->
[
begin
preproc_data_template(Data)
end
|| Data <- DataList
];
preproc_data_template(#{action := create} = Data) ->
Index = maps:get('_index', Data, ""),
Id = maps:get('_id', Data, ""),
RequiredAlias = maps:get(require_alias, Data, ""),
Fields = maps:get(fields, Data, ""),
#{
action => create,
'_index' => emqx_placeholder:preproc_tmpl(Index),
'_id' => emqx_placeholder:preproc_tmpl(Id),
require_alias => emqx_placeholder:preproc_tmpl(RequiredAlias),
fields => emqx_placeholder:preproc_tmpl(Fields)
};
preproc_data_template(#{action := index} = Data) ->
Data1 = preproc_data_template(Data#{action => create}),
Data1#{action => index};
preproc_data_template(#{action := delete} = Data) ->
Data1 = preproc_data_template(Data#{action => create}),
Data2 = Data1#{action => delete},
maps:remove(fields, Data2);
preproc_data_template(#{action := update} = Data) ->
Data1 = preproc_data_template(Data#{action => index}),
DocAsUpsert = maps:get(doc_as_upsert, Data, ""),
Upsert = maps:get(upsert, Data, ""),
Data1#{
action => update,
doc_as_upsert => emqx_placeholder:preproc_tmpl(DocAsUpsert),
upsert => emqx_placeholder:preproc_tmpl(Upsert)
}.
try_render_message({ChannelId, Msg}, Channels) ->
case maps:find(ChannelId, Channels) of
{ok, #{data := Data}} ->
{ok, proc_data(Data, Msg)};
_ ->
{error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
end.
get_body_template(#{doc := Doc}) -> Doc;
get_body_template(_) -> undefined.

View File

@ -317,7 +317,7 @@ on_query(InstId, {send_message, Msg}, State) ->
%% BridgeV2 entrypoint
on_query(
InstId,
{ActionId, MsgAndBody},
{ActionId, Msg},
State = #{installed_actions := InstalledActions}
) when is_binary(ActionId) ->
case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
@ -334,10 +334,10 @@ on_query(
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request_and_action(Request, ActionState, MsgAndBody),
} = process_request_and_action(Request, ActionState, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry
Retry = 2,
ClientId = clientid(MsgAndBody),
ClientId = clientid(Msg),
on_query(
InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
@ -430,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
%% BridgeV2 entrypoint
on_query_async(
InstId,
{ActionId, MsgAndBody},
{ActionId, Msg},
ReplyFunAndArgs,
State = #{installed_actions := InstalledActions}
) when is_binary(ActionId) ->
@ -448,8 +448,8 @@ on_query_async(
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request_and_action(Request, ActionState, MsgAndBody),
ClientId = clientid(MsgAndBody),
} = process_request_and_action(Request, ActionState, Msg),
ClientId = clientid(Msg),
on_query_async(
InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout},
@ -629,7 +629,7 @@ maybe_parse_template(Key, Conf) ->
parse_template(String) ->
emqx_template:parse(String).
process_request_and_action(Request, ActionState, {Msg, Body}) ->
process_request_and_action(Request, ActionState, Msg) ->
MethodTemplate = maps:get(method, ActionState),
Method = make_method(render_template_string(MethodTemplate, Msg)),
PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
@ -647,17 +647,15 @@ process_request_and_action(Request, ActionState, {Msg, Body}) ->
render_headers(HeadersTemplate1, Msg),
render_headers(HeadersTemplate2, Msg)
),
BodyTemplate = maps:get(body, ActionState),
Body = render_request_body(BodyTemplate, Msg),
#{
method => Method,
path => Path,
body => Body,
headers => Headers,
request_timeout => maps:get(request_timeout, ActionState)
};
process_request_and_action(Request, ActionState, Msg) ->
BodyTemplate = maps:get(body, ActionState),
Body = render_request_body(BodyTemplate, Msg),
process_request_and_action(Request, ActionState, {Msg, Body}).
}.
merge_proplist(Proplist1, Proplist2) ->
lists:foldl(
@ -877,7 +875,6 @@ redact_request({Path, Headers}) ->
redact_request({Path, Headers, _Body}) ->
{Path, Headers, <<"******">>}.
clientid({Msg, _Body}) -> clientid(Msg);
clientid(Msg) -> maps:get(clientid, Msg, undefined).
-ifdef(TEST).

View File

@ -126,4 +126,10 @@ action_parameters.desc:
action_parameters.label:
"""Parameters"""
config_overwrite.desc:
"""Set to false If a document with the specified _id already exists(conflict), the operation will failed."""
config_overwrite.label:
"""overwrite"""
}