Merge pull request #10107 from sstrigler/EMQX-9147-bridges-api-fix-handling-unknown-bridge-id

Bridges API: fix handling unknown bridge-id
This commit is contained in:
Stefan Strigler 2023-03-13 16:40:46 +01:00 committed by GitHub
commit 62aec87c56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 184 additions and 89 deletions

View File

@ -46,12 +46,18 @@
-export([lookup_from_local_node/2]). -export([lookup_from_local_node/2]).
-define(BAD_REQUEST(Reason), {400, error_msg('BAD_REQUEST', Reason)}).
-define(BRIDGE_NOT_ENABLED,
?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>)
).
-define(NOT_FOUND(Reason), {404, error_msg('NOT_FOUND', Reason)}). -define(NOT_FOUND(Reason), {404, error_msg('NOT_FOUND', Reason)}).
-define(BRIDGE_NOT_FOUND(Type, Name), -define(BRIDGE_NOT_FOUND(BridgeType, BridgeName),
?NOT_FOUND( ?NOT_FOUND(
<<"Bridge lookup failed: bridge named '", Name/binary, "' of type ", <<"Bridge lookup failed: bridge named '", BridgeName/binary, "' of type ",
(atom_to_binary(Type))/binary, " does not exist.">> (atom_to_binary(BridgeType))/binary, " does not exist.">>
) )
). ).
@ -98,11 +104,11 @@ get_response_body_schema() ->
param_path_operation_cluster() -> param_path_operation_cluster() ->
{operation, {operation,
mk( mk(
enum([stop, restart]), enum([start, stop, restart]),
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"restart">>, example => <<"start">>,
desc => ?DESC("desc_param_path_operation_cluster") desc => ?DESC("desc_param_path_operation_cluster")
} }
)}. )}.
@ -110,11 +116,11 @@ param_path_operation_cluster() ->
param_path_operation_on_node() -> param_path_operation_on_node() ->
{operation, {operation,
mk( mk(
enum([stop, restart]), enum([start, stop, restart]),
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"stop">>, example => <<"start">>,
desc => ?DESC("desc_param_path_operation_on_node") desc => ?DESC("desc_param_path_operation_on_node")
} }
)}. )}.
@ -355,7 +361,7 @@ schema("/bridges/:id") ->
204 => <<"Bridge deleted">>, 204 => <<"Bridge deleted">>,
400 => error_schema( 400 => error_schema(
'BAD_REQUEST', 'BAD_REQUEST',
"Can not delete bridge while active rules defined for this bridge" "Cannot delete bridge while active rules are defined for this bridge"
), ),
404 => error_schema('NOT_FOUND', "Bridge not found"), 404 => error_schema('NOT_FOUND', "Bridge not found"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
@ -447,7 +453,7 @@ schema("/nodes/:node/bridges/:id/:operation") ->
'BAD_REQUEST', 'BAD_REQUEST',
"Problem with configuration of external service or bridge not enabled" "Problem with configuration of external service or bridge not enabled"
), ),
404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"), 404 => error_schema('NOT_FOUND', "Bridge or node not found or invalid operation"),
501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
} }
@ -472,15 +478,13 @@ schema("/bridges_probe") ->
}. }.
'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) -> '/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
Conf = filter_out_request_body(Conf0),
case emqx_bridge:lookup(BridgeType, BridgeName) of case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)}; {400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
{error, not_found} -> {error, not_found} ->
case ensure_bridge_created(BridgeType, BridgeName, Conf) of Conf = filter_out_request_body(Conf0),
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201); {ok, _} = emqx_bridge:create(BridgeType, BridgeName, Conf),
{error, Error} -> {400, Error} lookup_from_all_nodes(BridgeType, BridgeName, 201)
end
end; end;
'/bridges'(get, _Params) -> '/bridges'(get, _Params) ->
{200, {200,
@ -499,12 +503,8 @@ schema("/bridges_probe") ->
{ok, _} -> {ok, _} ->
RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}), RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
Conf = deobfuscate(Conf1, RawConf), Conf = deobfuscate(Conf1, RawConf),
case ensure_bridge_created(BridgeType, BridgeName, Conf) of {ok, _} = emqx_bridge:create(BridgeType, BridgeName, Conf),
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 200);
lookup_from_all_nodes(BridgeType, BridgeName, 200);
{error, Error} ->
{400, Error}
end;
{error, not_found} -> {error, not_found} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName) ?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end end
@ -524,12 +524,10 @@ schema("/bridges_probe") ->
{ok, _} -> {ok, _} ->
204; 204;
{error, {rules_deps_on_this_bridge, RuleIds}} -> {error, {rules_deps_on_this_bridge, RuleIds}} ->
{400, ?BAD_REQUEST(
error_msg( {<<"Cannot delete bridge while active rules are defined for this bridge">>,
'BAD_REQUEST', RuleIds}
{<<"Can not delete bridge while active rules defined for this bridge">>, );
RuleIds}
)};
{error, timeout} -> {error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, Reason} -> {error, Reason} ->
@ -561,7 +559,7 @@ schema("/bridges_probe") ->
Params1 = maybe_deobfuscate_bridge_probe(Params), Params1 = maybe_deobfuscate_bridge_probe(Params),
case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of
ok -> ok ->
{204}; 204;
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
{400, error_msg('TEST_FAILED', to_hr_reason(Reason))} {400, error_msg('TEST_FAILED', to_hr_reason(Reason))}
end; end;
@ -615,7 +613,7 @@ lookup_from_local_node(BridgeType, BridgeName) ->
OperFunc -> OperFunc ->
case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of
{ok, _} -> {ok, _} ->
{204}; 204;
{error, {pre_config_update, _, bridge_not_found}} -> {error, {pre_config_update, _, bridge_not_found}} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName); ?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
{error, {_, _, timeout}} -> {error, {_, _, timeout}} ->
@ -638,8 +636,16 @@ lookup_from_local_node(BridgeType, BridgeName) ->
invalid -> invalid ->
?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>);
OperFunc -> OperFunc ->
Nodes = mria:running_nodes(), try is_enabled_bridge(BridgeType, BridgeName) of
call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName]) false ->
?BRIDGE_NOT_ENABLED;
true ->
Nodes = mria:running_nodes(),
call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName])
catch
throw:not_found ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end
end end
). ).
@ -653,14 +659,9 @@ lookup_from_local_node(BridgeType, BridgeName) ->
invalid -> invalid ->
?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>);
OperFunc -> OperFunc ->
ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]), try is_enabled_bridge(BridgeType, BridgeName) of
case maps:get(enable, ConfMap, false) of
false -> false ->
{400, ?BRIDGE_NOT_ENABLED;
error_msg(
'BAD_REQUEST',
<<"Forbidden operation, bridge not enabled">>
)};
true -> true ->
case emqx_misc:safe_to_existing_atom(Node, utf8) of case emqx_misc:safe_to_existing_atom(Node, utf8) of
{ok, TargetNode} -> {ok, TargetNode} ->
@ -670,10 +671,22 @@ lookup_from_local_node(BridgeType, BridgeName) ->
{error, _} -> {error, _} ->
?NOT_FOUND(<<"Invalid node name: ", Node/binary>>) ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>)
end end
catch
throw:not_found ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end end
end end
). ).
is_enabled_bridge(BridgeType, BridgeName) ->
try emqx:get_config([bridges, BridgeType, BridgeName]) of
ConfMap ->
maps:get(enable, ConfMap, false)
catch
error:{config_not_found, _} ->
throw(not_found)
end.
node_operation_func(<<"restart">>) -> restart_bridge_to_node; node_operation_func(<<"restart">>) -> restart_bridge_to_node;
node_operation_func(<<"start">>) -> start_bridge_to_node; node_operation_func(<<"start">>) -> start_bridge_to_node;
node_operation_func(<<"stop">>) -> stop_bridge_to_node; node_operation_func(<<"stop">>) -> stop_bridge_to_node;
@ -688,12 +701,6 @@ enable_func(<<"true">>) -> enable;
enable_func(<<"false">>) -> disable; enable_func(<<"false">>) -> disable;
enable_func(_) -> invalid. enable_func(_) -> invalid.
ensure_bridge_created(BridgeType, BridgeName, Conf) ->
case emqx_bridge:create(BridgeType, BridgeName, Conf) of
{ok, _} -> ok;
{error, Reason} -> {error, error_msg('BAD_REQUEST', Reason)}
end.
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name}, Acc) -> fun(#{type := Type, name := Name}, Acc) ->
@ -930,12 +937,10 @@ bin(S) when is_atom(S) ->
bin(S) when is_binary(S) -> bin(S) when is_binary(S) ->
S. S.
call_operation(NodeOrAll, OperFunc, Args) -> call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
ok -> Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
{204}; 204;
{ok, _} ->
{204};
{error, not_implemented} -> {error, not_implemented} ->
%% Should only happen if we call `start` on a node that is %% Should only happen if we call `start` on a node that is
%% still on an older bpapi version that doesn't support it. %% still on an older bpapi version that doesn't support it.
@ -953,8 +958,12 @@ call_operation(NodeOrAll, OperFunc, Args) ->
) )
) )
)}; )};
{error, not_found} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
{error, {node_not_found, Node}} ->
?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>);
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
{400, error_msg('BAD_REQUEST', to_hr_reason(Reason))} ?BAD_REQUEST(to_hr_reason(Reason))
end. end.
maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->
@ -962,14 +971,19 @@ maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->
maybe_try_restart(Node, start_bridge_to_node, Args) -> maybe_try_restart(Node, start_bridge_to_node, Args) ->
call_operation(Node, restart_bridge_to_node, Args); call_operation(Node, restart_bridge_to_node, Args);
maybe_try_restart(_, _, _) -> maybe_try_restart(_, _, _) ->
{501}. 501.
do_bpapi_call(all, Call, Args) -> do_bpapi_call(all, Call, Args) ->
maybe_unwrap( maybe_unwrap(
do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args) do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args)
); );
do_bpapi_call(Node, Call, Args) -> do_bpapi_call(Node, Call, Args) ->
do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args). case lists:member(Node, mria:running_nodes()) of
true ->
do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args);
false ->
{error, {node_not_found, Node}}
end.
do_bpapi_call_vsn(SupportedVersion, Call, Args) -> do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
case lists:member(SupportedVersion, supported_versions(Call)) of case lists:member(SupportedVersion, supported_versions(Call)) of

View File

@ -196,6 +196,9 @@ t_http_crud_apis(Config) ->
%% assert we there's no bridges at first %% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
{ok, 404, _} = request(get, uri(["bridges", "foo"]), []),
{ok, 404, _} = request(get, uri(["bridges", "webhook:foo"]), []),
%% then we add a webhook bridge, using POST %% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge %% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"), URL1 = ?URL(Port, "path1"),
@ -214,7 +217,7 @@ t_http_crud_apis(Config) ->
<<"status">> := _, <<"status">> := _,
<<"node_status">> := [_ | _], <<"node_status">> := [_ | _],
<<"url">> := URL1 <<"url">> := URL1
} = jsx:decode(Bridge), } = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% send an message to emqx and the message should be forwarded to the HTTP server %% send an message to emqx and the message should be forwarded to the HTTP server
@ -251,7 +254,7 @@ t_http_crud_apis(Config) ->
<<"node_status">> := [_ | _], <<"node_status">> := [_ | _],
<<"url">> := URL2 <<"url">> := URL2
}, },
jsx:decode(Bridge2) emqx_json:decode(Bridge2, [return_maps])
), ),
%% list all bridges again, assert Bridge2 is in it %% list all bridges again, assert Bridge2 is in it
@ -269,7 +272,7 @@ t_http_crud_apis(Config) ->
<<"url">> := URL2 <<"url">> := URL2
} }
], ],
jsx:decode(Bridge2Str) emqx_json:decode(Bridge2Str, [return_maps])
), ),
%% get the bridge by id %% get the bridge by id
@ -283,7 +286,7 @@ t_http_crud_apis(Config) ->
<<"node_status">> := [_ | _], <<"node_status">> := [_ | _],
<<"url">> := URL2 <<"url">> := URL2
}, },
jsx:decode(Bridge3Str) emqx_json:decode(Bridge3Str, [return_maps])
), ),
%% send an message to emqx again, check the path has been changed %% send an message to emqx again, check the path has been changed
@ -315,8 +318,19 @@ t_http_crud_apis(Config) ->
<<"code">> := <<"NOT_FOUND">>, <<"code">> := <<"NOT_FOUND">>,
<<"message">> := _ <<"message">> := _
}, },
jsx:decode(ErrMsg2) emqx_json:decode(ErrMsg2, [return_maps])
), ),
%% try delete bad bridge id
{ok, 404, BadId} = request(delete, uri(["bridges", "foo"]), []),
?assertMatch(
#{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Invalid bridge ID", _/binary>>
},
emqx_json:decode(BadId, [return_maps])
),
%% Deleting a non-existing bridge should result in an error %% Deleting a non-existing bridge should result in an error
{ok, 404, ErrMsg3} = request(delete, uri(["bridges", BridgeID]), []), {ok, 404, ErrMsg3} = request(delete, uri(["bridges", BridgeID]), []),
?assertMatch( ?assertMatch(
@ -324,7 +338,7 @@ t_http_crud_apis(Config) ->
<<"code">> := <<"NOT_FOUND">>, <<"code">> := <<"NOT_FOUND">>,
<<"message">> := _ <<"message">> := _
}, },
jsx:decode(ErrMsg3) emqx_json:decode(ErrMsg3, [return_maps])
), ),
ok. ok.
@ -402,14 +416,17 @@ t_check_dependent_actions_on_delete(Config) ->
<<"sql">> => <<"SELECT * from \"t\"">> <<"sql">> => <<"SELECT * from \"t\"">>
} }
), ),
#{<<"id">> := RuleId} = jsx:decode(Rule), #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]),
%% delete the bridge should fail because there is a rule depenents on it %% deleting the bridge should fail because there is a rule that depends on it
{ok, 400, _} = request(delete, uri(["bridges", BridgeID]), []), {ok, 400, _} = request(
delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", []
),
%% delete the rule first %% delete the rule first
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
%% then delete the bridge is OK %% then delete the bridge is OK
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok. ok.
t_cascade_delete_actions(Config) -> t_cascade_delete_actions(Config) ->
@ -437,18 +454,39 @@ t_cascade_delete_actions(Config) ->
<<"sql">> => <<"SELECT * from \"t\"">> <<"sql">> => <<"SELECT * from \"t\"">>
} }
), ),
#{<<"id">> := RuleId} = jsx:decode(Rule), #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]),
%% delete the bridge will also delete the actions from the rules %% delete the bridge will also delete the actions from the rules
{ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []), {ok, 204, _} = request(
delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=true", []
),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
?assertMatch( ?assertMatch(
#{ #{
<<"actions">> := [] <<"actions">> := []
}, },
jsx:decode(Rule1) emqx_json:decode(Rule1, [return_maps])
), ),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
{ok, 201, _} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"t_http_crud_apis">>,
<<"enable">> => true,
<<"actions">> => [BridgeID],
<<"sql">> => <<"SELECT * from \"t\"">>
}
),
{ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok. ok.
t_broken_bpapi_vsn(Config) -> t_broken_bpapi_vsn(Config) ->
@ -486,6 +524,18 @@ t_old_bpapi_vsn(Config) ->
ok. ok.
t_start_stop_bridges_node(Config) -> t_start_stop_bridges_node(Config) ->
{ok, 404, _} =
request(
post,
uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]),
<<"">>
),
{ok, 404, _} =
request(
post,
uri(["nodes", "undefined", "bridges", "webhook:foo", start]),
<<"">>
),
do_start_stop_bridges(node, Config). do_start_stop_bridges(node, Config).
t_start_stop_bridges_cluster(Config) -> t_start_stop_bridges_cluster(Config) ->
@ -511,34 +561,42 @@ do_start_stop_bridges(Type, Config) ->
<<"status">> := <<"connected">>, <<"status">> := <<"connected">>,
<<"node_status">> := [_ | _], <<"node_status">> := [_ | _],
<<"url">> := URL1 <<"url">> := URL1
} = jsx:decode(Bridge), } = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% stop it %% stop it
{ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), ?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])),
%% start again %% start again
{ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
%% start a started bridge %% start a started bridge
{ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
{ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3_1)), ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3_1, [return_maps])),
%% restart an already started bridge %% restart an already started bridge
{ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
%% stop it again %% stop it again
{ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
%% restart a stopped bridge %% restart a stopped bridge
{ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])),
{ok, 404, _} = request(post, operation_path(Type, invalidop, BridgeID), <<"">>),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% Fail parse-id check
{ok, 404, _} = request(post, operation_path(Type, start, <<"wreckbook_fugazi">>), <<"">>),
%% Looks ok but doesn't exist
{ok, 404, _} = request(post, operation_path(Type, start, <<"webhook:cptn_hook">>), <<"">>),
%% Create broken bridge %% Create broken bridge
{ListenPort, Sock} = listen_on_random_port(), {ListenPort, Sock} = listen_on_random_port(),
%% Connecting to this endpoint should always timeout %% Connecting to this endpoint should always timeout
@ -556,7 +614,7 @@ do_start_stop_bridges(Type, Config) ->
<<"server">> := BadServer, <<"server">> := BadServer,
<<"status">> := <<"connecting">>, <<"status">> := <<"connecting">>,
<<"node_status">> := [_ | _] <<"node_status">> := [_ | _]
} = jsx:decode(BadBridge1), } = emqx_json:decode(BadBridge1, [return_maps]),
BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
?assertMatch( ?assertMatch(
{ok, SC, _} when SC == 500 orelse SC == 503, {ok, SC, _} when SC == 500 orelse SC == 503,
@ -585,33 +643,39 @@ t_enable_disable_bridges(Config) ->
<<"status">> := <<"connected">>, <<"status">> := <<"connected">>,
<<"node_status">> := [_ | _], <<"node_status">> := [_ | _],
<<"url">> := URL1 <<"url">> := URL1
} = jsx:decode(Bridge), } = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% disable it %% disable it
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), ?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])),
%% enable again %% enable again
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
%% enable an already started bridge %% enable an already started bridge
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
%% disable it again %% disable it again
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
{ok, 400, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>), %% bad param
{ok, 404, _} = request(put, enable_path(foo, BridgeID), <<"">>),
{ok, 404, _} = request(put, enable_path(true, "foo"), <<"">>),
{ok, 404, _} = request(put, enable_path(true, "webhook:foo"), <<"">>),
{ok, 400, Res} = request(post, operation_path(node, start, BridgeID), <<"">>),
?assertEqual( ?assertEqual(
<<"{\"code\":\"BAD_REQUEST\",\"message\":\"Forbidden operation, bridge not enabled\"}">>, <<"{\"code\":\"BAD_REQUEST\",\"message\":\"Forbidden operation, bridge not enabled\"}">>,
Res Res
), ),
{ok, 400, Res} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
%% enable a stopped bridge %% enable a stopped bridge
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
@ -636,7 +700,7 @@ t_reset_bridges(Config) ->
<<"status">> := <<"connected">>, <<"status">> := <<"connected">>,
<<"node_status">> := [_ | _], <<"node_status">> := [_ | _],
<<"url">> := URL1 <<"url">> := URL1
} = jsx:decode(Bridge), } = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
@ -704,7 +768,7 @@ t_bridges_probe(Config) ->
<<"code">> := <<"TEST_FAILED">>, <<"code">> := <<"TEST_FAILED">>,
<<"message">> := _ <<"message">> := _
}, },
jsx:decode(NxDomain) emqx_json:decode(NxDomain, [return_maps])
), ),
{ok, 204, _} = request( {ok, 204, _} = request(
@ -723,7 +787,7 @@ t_bridges_probe(Config) ->
<<"code">> := <<"TEST_FAILED">>, <<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Connection refused">> <<"message">> := <<"Connection refused">>
}, },
jsx:decode(ConnRefused) emqx_json:decode(ConnRefused, [return_maps])
), ),
{ok, 400, HostNotFound} = request( {ok, 400, HostNotFound} = request(
@ -736,7 +800,7 @@ t_bridges_probe(Config) ->
<<"code">> := <<"TEST_FAILED">>, <<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Host not found">> <<"message">> := <<"Host not found">>
}, },
jsx:decode(HostNotFound) emqx_json:decode(HostNotFound, [return_maps])
), ),
AuthnConfig = #{ AuthnConfig = #{
@ -767,7 +831,7 @@ t_bridges_probe(Config) ->
<<"code">> := <<"TEST_FAILED">>, <<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Unauthorized client">> <<"message">> := <<"Unauthorized client">>
}, },
jsx:decode(Unauthorized) emqx_json:decode(Unauthorized, [return_maps])
), ),
{ok, 400, Malformed} = request( {ok, 400, Malformed} = request(
@ -782,7 +846,7 @@ t_bridges_probe(Config) ->
<<"code">> := <<"TEST_FAILED">>, <<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Malformed username or password">> <<"message">> := <<"Malformed username or password">>
}, },
jsx:decode(Malformed) emqx_json:decode(Malformed, [return_maps])
), ),
{ok, 400, NotAuthorized} = request( {ok, 400, NotAuthorized} = request(
@ -795,7 +859,7 @@ t_bridges_probe(Config) ->
<<"code">> := <<"TEST_FAILED">>, <<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Not authorized">> <<"message">> := <<"Not authorized">>
}, },
jsx:decode(NotAuthorized) emqx_json:decode(NotAuthorized, [return_maps])
), ),
{ok, 400, BadReq} = request( {ok, 400, BadReq} = request(
@ -803,7 +867,7 @@ t_bridges_probe(Config) ->
uri(["bridges_probe"]), uri(["bridges_probe"]),
?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>) ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>)
), ),
?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, jsx:decode(BadReq)), ?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, emqx_json:decode(BadReq, [return_maps])),
ok. ok.
t_metrics(Config) -> t_metrics(Config) ->
@ -829,7 +893,7 @@ t_metrics(Config) ->
<<"status">> := _, <<"status">> := _,
<<"node_status">> := [_ | _], <<"node_status">> := [_ | _],
<<"url">> := URL1 <<"url">> := URL1
} = jsx:decode(Bridge), } = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
@ -840,12 +904,12 @@ t_metrics(Config) ->
<<"metrics">> := #{<<"success">> := 0}, <<"metrics">> := #{<<"success">> := 0},
<<"node_metrics">> := [_ | _] <<"node_metrics">> := [_ | _]
}, },
jsx:decode(Bridge1Str) emqx_json:decode(Bridge1Str, [return_maps])
), ),
%% check that the bridge doesn't contain metrics anymore %% check that the bridge doesn't contain metrics anymore
{ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []),
Decoded = jsx:decode(Bridge2Str), Decoded = emqx_json:decode(Bridge2Str, [return_maps]),
?assertNot(maps:is_key(<<"metrics">>, Decoded)), ?assertNot(maps:is_key(<<"metrics">>, Decoded)),
?assertNot(maps:is_key(<<"node_metrics">>, Decoded)), ?assertNot(maps:is_key(<<"node_metrics">>, Decoded)),
@ -875,7 +939,7 @@ t_metrics(Config) ->
<<"metrics">> := #{<<"success">> := _}, <<"metrics">> := #{<<"success">> := _},
<<"node_metrics">> := [_ | _] <<"node_metrics">> := [_ | _]
}, },
jsx:decode(Bridge3Str) emqx_json:decode(Bridge3Str, [return_maps])
), ),
%% check for non-empty metrics when listing all bridges %% check for non-empty metrics when listing all bridges
@ -887,7 +951,7 @@ t_metrics(Config) ->
<<"node_metrics">> := [_ | _] <<"node_metrics">> := [_ | _]
} }
], ],
jsx:decode(BridgesStr) emqx_json:decode(BridgesStr, [return_maps])
), ),
ok. ok.

View File

@ -0,0 +1,9 @@
For operations on `bridges API` if `bridge-id` is unknown we now return `404`
instead of `400`. Also a bug was fixed that caused a crash if that was a node
operation. Additionally we now also check if the given bridge is enabled when
doing the cluster operation `start` . Affected endpoints:
* [cluster] `/bridges/:id/:operation`,
* [node] `/nodes/:node/bridges/:id/:operation`, where `operation` is one of
`[start|stop|restart]`.
Moreover, for a node operation, EMQX checks if node name is in our cluster and
return `404` instead of `501`.

View File

@ -0,0 +1,8 @@
现在对桥接的 API 进行调用时,如果 `bridge-id` 不存在,将会返回 `404`,而不再是`400`。
然后,还修复了这种情况下,在节点级别上进行 API 调用时,可能导致崩溃的问题。
另外,在启动某个桥接时,会先检查指定桥接是否已启用。
受影响的接口有:
* [cluster] `/bridges/:id/:operation`,
* [node] `/nodes/:node/bridges/:id/:operation`,
其中 `operation``[start|stop|restart]` 之一。
此外对于节点操作EMQX 将检查节点是否存在于集群中,如果不在,则会返回`404`,而不再是`501`。