fix: simplify by enabling check_schema

This commit is contained in:
Stefan Strigler 2023-10-18 17:08:12 +02:00 committed by Zaiming (Stone) Shi
parent 2b66018d3b
commit 29ca7f944f
3 changed files with 62 additions and 90 deletions

View File

@ -69,7 +69,7 @@
namespace() -> "connector". namespace() -> "connector".
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() -> paths() ->
[ [
@ -347,11 +347,7 @@ schema("/connectors_probe") ->
?OK(zip_connectors(AllConnectors)); ?OK(zip_connectors(AllConnectors));
{error, Reason} -> {error, Reason} ->
?INTERNAL_ERROR(Reason) ?INTERNAL_ERROR(Reason)
end; end.
'/connectors'(post, _Params) ->
?BAD_REQUEST(<<"Bad Request">>);
'/connectors'(_, _) ->
?METHOD_NOT_ALLOWED.
'/connectors/:id'(get, #{bindings := #{id := Id}}) -> '/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(ConnectorType, ConnectorName, 200)); ?TRY_PARSE_ID(Id, lookup_from_all_nodes(ConnectorType, ConnectorName, 200));
@ -389,9 +385,7 @@ schema("/connectors_probe") ->
{error, not_found} -> {error, not_found} ->
?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName)
end end
); ).
'/connectors/:id'(_, _) ->
?METHOD_NOT_ALLOWED.
'/connectors_probe'(post, Request) -> '/connectors_probe'(post, Request) ->
RequestMeta = #{module => ?MODULE, method => post, path => "/connectors_probe"}, RequestMeta = #{module => ?MODULE, method => post, path => "/connectors_probe"},
@ -417,9 +411,7 @@ schema("/connectors_probe") ->
end; end;
BadRequest -> BadRequest ->
redact(BadRequest) redact(BadRequest)
end; end.
'/connectors_probe'(_, _) ->
?METHOD_NOT_ALLOWED.
maybe_deobfuscate_connector_probe( maybe_deobfuscate_connector_probe(
#{<<"type">> := ConnectorType, <<"name">> := ConnectorName} = Params #{<<"type">> := ConnectorType, <<"name">> := ConnectorName} = Params
@ -471,22 +463,17 @@ create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) -
'/connectors/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> '/connectors/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
Id, Id,
case enable_func(Enable) of case emqx_connector:disable_enable(enable_func(Enable), ConnectorType, ConnectorName) of
invalid -> {ok, _} ->
?NOT_FOUND(<<"Invalid operation">>); ?NO_CONTENT;
OperFunc -> {error, {pre_config_update, _, connector_not_found}} ->
case emqx_connector:disable_enable(OperFunc, ConnectorType, ConnectorName) of ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName);
{ok, _} -> {error, {_, _, timeout}} ->
?NO_CONTENT; ?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, {pre_config_update, _, connector_not_found}} -> {error, timeout} ->
?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName); ?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, {_, _, timeout}} -> {error, Reason} ->
?SERVICE_UNAVAILABLE(<<"request timeout">>); ?INTERNAL_ERROR(Reason)
{error, timeout} ->
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, Reason} ->
?INTERNAL_ERROR(Reason)
end
end end
). ).
@ -496,20 +483,10 @@ create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) -
}) -> }) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
Id, Id,
case operation_to_all_func(Op) of begin
invalid -> OperFunc = operation_func(all, Op),
?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); Nodes = mria:running_nodes(),
OperFunc -> call_operation_if_enabled(all, OperFunc, [Nodes, ConnectorType, ConnectorName])
try is_enabled_connector(ConnectorType, ConnectorName) of
false ->
?CONNECTOR_NOT_ENABLED;
true ->
Nodes = mria:running_nodes(),
call_operation(all, OperFunc, [Nodes, ConnectorType, ConnectorName])
catch
throw:not_found ->
?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName)
end
end end
). ).
@ -519,29 +496,28 @@ create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) -
}) -> }) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
Id, Id,
case node_operation_func(Op) of case emqx_utils:safe_to_existing_atom(Node, utf8) of
invalid -> {ok, TargetNode} ->
?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); OperFunc = operation_func(TargetNode, Op),
OperFunc -> call_operation_if_enabled(TargetNode, OperFunc, [
try is_enabled_connector(ConnectorType, ConnectorName) of TargetNode, ConnectorType, ConnectorName
false -> ]);
?CONNECTOR_NOT_ENABLED; {error, _} ->
true -> ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>)
case emqx_utils:safe_to_existing_atom(Node, utf8) of
{ok, TargetNode} ->
call_operation(TargetNode, OperFunc, [
TargetNode, ConnectorType, ConnectorName
]);
{error, _} ->
?NOT_FOUND(<<"Invalid node name: ", Node/binary>>)
end
catch
throw:not_found ->
?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName)
end
end end
). ).
call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, BridgeType, BridgeName]) ->
try is_enabled_connector(BridgeType, BridgeName) of
false ->
?CONNECTOR_NOT_ENABLED;
true ->
call_operation(NodeOrAll, OperFunc, [Nodes, BridgeType, BridgeName])
catch
throw:not_found ->
?CONNECTOR_NOT_FOUND(BridgeType, BridgeName)
end.
is_enabled_connector(ConnectorType, ConnectorName) -> is_enabled_connector(ConnectorType, ConnectorName) ->
try emqx:get_config([connectors, ConnectorType, binary_to_existing_atom(ConnectorName)]) of try emqx:get_config([connectors, ConnectorType, binary_to_existing_atom(ConnectorName)]) of
ConfMap -> ConfMap ->
@ -555,19 +531,15 @@ is_enabled_connector(ConnectorType, ConnectorName) ->
throw(not_found) throw(not_found)
end. end.
node_operation_func(<<"restart">>) -> restart_connector_to_node; operation_func(all, restart) -> restart_connectors_to_all_nodes;
node_operation_func(<<"start">>) -> start_connector_to_node; operation_func(all, start) -> start_connectors_to_all_nodes;
node_operation_func(<<"stop">>) -> stop_connector_to_node; operation_func(all, stop) -> stop_connectors_to_all_nodes;
node_operation_func(_) -> invalid. operation_func(_Node, restart) -> restart_connector_to_node;
operation_func(_Node, start) -> start_connector_to_node;
operation_func(_Node, stop) -> stop_connector_to_node.
operation_to_all_func(<<"restart">>) -> restart_connectors_to_all_nodes; enable_func(true) -> enable;
operation_to_all_func(<<"start">>) -> start_connectors_to_all_nodes; enable_func(false) -> disable.
operation_to_all_func(<<"stop">>) -> stop_connectors_to_all_nodes;
operation_to_all_func(_) -> invalid.
enable_func(<<"true">>) -> enable;
enable_func(<<"false">>) -> disable;
enable_func(_) -> invalid.
zip_connectors([ConnectorsFirstNode | _] = ConnectorsAllNodes) -> zip_connectors([ConnectorsFirstNode | _] = ConnectorsAllNodes) ->
lists:foldl( lists:foldl(

View File

@ -244,16 +244,14 @@ bridge_api_union(Refs) ->
undefined -> undefined ->
throw(#{ throw(#{
field_name => type, field_name => type,
value => T,
reason => <<"unknown bridge type">> reason => <<"unknown bridge type">>
}); });
Ref -> Ref ->
[Ref] [Ref]
end; end;
_ -> _ ->
throw(#{ maps:values(Index)
field_name => type,
reason => <<"unknown bridge type">>
})
end end
end. end.

View File

@ -26,7 +26,6 @@
-define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))). -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(CONNECTOR(NAME, TYPE), #{ -define(CONNECTOR(NAME, TYPE), #{
<<"enable">> => true,
%<<"ssl">> => #{<<"enable">> => false}, %<<"ssl">> => #{<<"enable">> => false},
<<"type">> => TYPE, <<"type">> => TYPE,
<<"name">> => NAME <<"name">> => NAME
@ -35,10 +34,11 @@
-define(CONNECTOR_TYPE_STR, "kafka"). -define(CONNECTOR_TYPE_STR, "kafka").
-define(CONNECTOR_TYPE, <<?CONNECTOR_TYPE_STR>>). -define(CONNECTOR_TYPE, <<?CONNECTOR_TYPE_STR>>).
-define(KAFKA_BOOTSTRAP_HOST, <<"127.0.0.1:9092">>). -define(KAFKA_BOOTSTRAP_HOST, <<"127.0.0.1:9092">>).
-define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?CONNECTOR(Name, ?CONNECTOR_TYPE)#{ -define(KAFKA_CONNECTOR_BASE(BootstrapHosts), #{
<<"authentication">> => <<"none">>, <<"authentication">> => <<"none">>,
<<"bootstrap_hosts">> => BootstrapHosts, <<"bootstrap_hosts">> => BootstrapHosts,
<<"connect_timeout">> => <<"5s">>, <<"connect_timeout">> => <<"5s">>,
<<"enable">> => true,
<<"metadata_request_timeout">> => <<"5s">>, <<"metadata_request_timeout">> => <<"5s">>,
<<"min_metadata_refresh_interval">> => <<"3s">>, <<"min_metadata_refresh_interval">> => <<"3s">>,
<<"socket_opts">> => <<"socket_opts">> =>
@ -49,6 +49,9 @@
<<"tcp_keepalive">> => <<"none">> <<"tcp_keepalive">> => <<"none">>
} }
}). }).
-define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)).
-define(KAFKA_CONNECTOR(Name, BootstrapHosts),
?CONNECTOR(Name, ?CONNECTOR_TYPE)?KAFKA_CONNECTOR_BASE(BootstrapHosts)).
-define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). -define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)).
%% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>). %% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>).
@ -284,14 +287,13 @@ t_connectors_lifecycle(Config) ->
<<"type">> := ?CONNECTOR_TYPE, <<"type">> := ?CONNECTOR_TYPE,
<<"name">> := ConnectorName, <<"name">> := ConnectorName,
<<"bootstrap_hosts">> := <<"foobla:1234">>, <<"bootstrap_hosts">> := <<"foobla:1234">>,
<<"enable">> := true,
<<"status">> := _, <<"status">> := _,
<<"node_status">> := [_ | _] <<"node_status">> := [_ | _]
}}, }},
request_json( request_json(
put, put,
uri(["connectors", ConnectorID]), uri(["connectors", ConnectorID]),
?KAFKA_CONNECTOR(?CONNECTOR_NAME, <<"foobla:1234">>), ?KAFKA_CONNECTOR_BASE(<<"foobla:1234">>),
Config Config
) )
), ),
@ -323,9 +325,9 @@ t_connectors_lifecycle(Config) ->
), ),
?assertMatch( ?assertMatch(
{ok, 404, #{ {ok, 400, #{
<<"code">> := <<"NOT_FOUND">>, <<"code">> := <<"BAD_REQUEST">>,
<<"message">> := <<"Invalid operation", _/binary>> <<"message">> := _
}}, }},
request_json(post, uri(["connectors", ConnectorID, "brababbel"]), Config) request_json(post, uri(["connectors", ConnectorID, "brababbel"]), Config)
), ),
@ -343,7 +345,7 @@ t_connectors_lifecycle(Config) ->
request_json( request_json(
put, put,
uri(["connectors", ConnectorID]), uri(["connectors", ConnectorID]),
?KAFKA_CONNECTOR(?CONNECTOR_NAME), ?KAFKA_CONNECTOR_BASE,
Config Config
) )
), ),
@ -453,7 +455,7 @@ do_start_stop_connectors(TestType, Config) ->
request_json(get, uri(["connectors", ConnectorID]), Config) request_json(get, uri(["connectors", ConnectorID]), Config)
), ),
{ok, 404, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config), {ok, 400, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config),
@ -574,7 +576,7 @@ t_enable_disable_connectors(Config) ->
{ok, 204, <<>>} = request(put, enable_path(false, ConnectorID), Config), {ok, 204, <<>>} = request(put, enable_path(false, ConnectorID), Config),
%% bad param %% bad param
{ok, 404, _} = request(put, enable_path(foo, ConnectorID), Config), {ok, 400, _} = request(put, enable_path(foo, ConnectorID), Config),
{ok, 404, _} = request(put, enable_path(true, "foo"), Config), {ok, 404, _} = request(put, enable_path(true, "foo"), Config),
{ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config), {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config),
@ -619,9 +621,9 @@ t_with_redact_update(Config) ->
), ),
%% update with redacted config %% update with redacted config
ConnectorConf = emqx_utils:redact(Template), ConnectorUpdatedConf = maps:without([<<"name">>, <<"type">>], emqx_utils:redact(Template)),
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
{ok, 200, _} = request(put, uri(["connectors", ConnectorID]), ConnectorConf, Config), {ok, 200, _} = request(put, uri(["connectors", ConnectorID]), ConnectorUpdatedConf, Config),
?assertEqual( ?assertEqual(
Password, Password,
get_raw_config([connectors, ?CONNECTOR_TYPE, Name, authentication, password], Config) get_raw_config([connectors, ?CONNECTOR_TYPE, Name, authentication, password], Config)