fix(emqx_bridge): handle bridge not found in call_operation
This commit is contained in:
parent
fb3d101b3a
commit
80b81748df
|
@ -922,7 +922,7 @@ bin(S) when is_atom(S) ->
|
|||
bin(S) when is_binary(S) ->
|
||||
S.
|
||||
|
||||
call_operation(NodeOrAll, OperFunc, Args) ->
|
||||
call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
|
||||
case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
|
||||
Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
|
||||
204;
|
||||
|
@ -943,6 +943,8 @@ call_operation(NodeOrAll, OperFunc, Args) ->
|
|||
)
|
||||
)
|
||||
)};
|
||||
{error, not_found} ->
|
||||
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
||||
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
|
||||
?BAD_REQUEST(to_hr_reason(Reason))
|
||||
end.
|
||||
|
@ -952,7 +954,7 @@ maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->
|
|||
maybe_try_restart(Node, start_bridge_to_node, Args) ->
|
||||
call_operation(Node, restart_bridge_to_node, Args);
|
||||
maybe_try_restart(_, _, _) ->
|
||||
{501}.
|
||||
501.
|
||||
|
||||
do_bpapi_call(all, Call, Args) ->
|
||||
maybe_unwrap(
|
||||
|
|
|
@ -214,7 +214,7 @@ t_http_crud_apis(Config) ->
|
|||
<<"status">> := _,
|
||||
<<"node_status">> := [_ | _],
|
||||
<<"url">> := URL1
|
||||
} = jsx:decode(Bridge),
|
||||
} = emqx_json:decode(Bridge, [return_maps]),
|
||||
|
||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||
%% send an message to emqx and the message should be forwarded to the HTTP server
|
||||
|
@ -251,7 +251,7 @@ t_http_crud_apis(Config) ->
|
|||
<<"node_status">> := [_ | _],
|
||||
<<"url">> := URL2
|
||||
},
|
||||
jsx:decode(Bridge2)
|
||||
emqx_json:decode(Bridge2, [return_maps])
|
||||
),
|
||||
|
||||
%% list all bridges again, assert Bridge2 is in it
|
||||
|
@ -269,7 +269,7 @@ t_http_crud_apis(Config) ->
|
|||
<<"url">> := URL2
|
||||
}
|
||||
],
|
||||
jsx:decode(Bridge2Str)
|
||||
emqx_json:decode(Bridge2Str, [return_maps])
|
||||
),
|
||||
|
||||
%% get the bridge by id
|
||||
|
@ -283,7 +283,7 @@ t_http_crud_apis(Config) ->
|
|||
<<"node_status">> := [_ | _],
|
||||
<<"url">> := URL2
|
||||
},
|
||||
jsx:decode(Bridge3Str)
|
||||
emqx_json:decode(Bridge3Str, [return_maps])
|
||||
),
|
||||
|
||||
%% send an message to emqx again, check the path has been changed
|
||||
|
@ -315,7 +315,7 @@ t_http_crud_apis(Config) ->
|
|||
<<"code">> := <<"NOT_FOUND">>,
|
||||
<<"message">> := _
|
||||
},
|
||||
jsx:decode(ErrMsg2)
|
||||
emqx_json:decode(ErrMsg2, [return_maps])
|
||||
),
|
||||
%% Deleting a non-existing bridge should result in an error
|
||||
{ok, 404, ErrMsg3} = request(delete, uri(["bridges", BridgeID]), []),
|
||||
|
@ -324,7 +324,7 @@ t_http_crud_apis(Config) ->
|
|||
<<"code">> := <<"NOT_FOUND">>,
|
||||
<<"message">> := _
|
||||
},
|
||||
jsx:decode(ErrMsg3)
|
||||
emqx_json:decode(ErrMsg3, [return_maps])
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -402,7 +402,7 @@ t_check_dependent_actions_on_delete(Config) ->
|
|||
<<"sql">> => <<"SELECT * from \"t\"">>
|
||||
}
|
||||
),
|
||||
#{<<"id">> := RuleId} = jsx:decode(Rule),
|
||||
#{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]),
|
||||
%% delete the bridge should fail because there is a rule depenents on it
|
||||
{ok, 400, _} = request(delete, uri(["bridges", BridgeID]), []),
|
||||
%% delete the rule first
|
||||
|
@ -437,7 +437,7 @@ t_cascade_delete_actions(Config) ->
|
|||
<<"sql">> => <<"SELECT * from \"t\"">>
|
||||
}
|
||||
),
|
||||
#{<<"id">> := RuleId} = jsx:decode(Rule),
|
||||
#{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]),
|
||||
%% delete the bridge will also delete the actions from the rules
|
||||
{ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []),
|
||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||
|
@ -446,7 +446,7 @@ t_cascade_delete_actions(Config) ->
|
|||
#{
|
||||
<<"actions">> := []
|
||||
},
|
||||
jsx:decode(Rule1)
|
||||
emqx_json:decode(Rule1, [return_maps])
|
||||
),
|
||||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||
ok.
|
||||
|
@ -511,34 +511,39 @@ do_start_stop_bridges(Type, Config) ->
|
|||
<<"status">> := <<"connected">>,
|
||||
<<"node_status">> := [_ | _],
|
||||
<<"url">> := URL1
|
||||
} = jsx:decode(Bridge),
|
||||
} = emqx_json:decode(Bridge, [return_maps]),
|
||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||
%% stop it
|
||||
{ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
|
||||
?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])),
|
||||
%% start again
|
||||
{ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
|
||||
%% start a started bridge
|
||||
{ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3_1)),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3_1, [return_maps])),
|
||||
%% restart an already started bridge
|
||||
{ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
|
||||
%% stop it again
|
||||
{ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
|
||||
%% restart a stopped bridge
|
||||
{ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])),
|
||||
%% delete the bridge
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
|
||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||
|
||||
%% Fail parse-id check
|
||||
{ok, 404, _} = request(post, operation_path(Type, start, <<"wreckbook_fugazi">>), <<"">>),
|
||||
%% Looks ok but doesn't exist
|
||||
{ok, 404, _} = request(post, operation_path(Type, start, <<"webhook:cptn_hook">>), <<"">>),
|
||||
|
||||
%% Create broken bridge
|
||||
{ListenPort, Sock} = listen_on_random_port(),
|
||||
%% Connecting to this endpoint should always timeout
|
||||
|
@ -556,7 +561,7 @@ do_start_stop_bridges(Type, Config) ->
|
|||
<<"server">> := BadServer,
|
||||
<<"status">> := <<"connecting">>,
|
||||
<<"node_status">> := [_ | _]
|
||||
} = jsx:decode(BadBridge1),
|
||||
} = emqx_json:decode(BadBridge1, [return_maps]),
|
||||
BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
|
||||
?assertMatch(
|
||||
{ok, SC, _} when SC == 500 orelse SC == 503,
|
||||
|
@ -585,24 +590,24 @@ t_enable_disable_bridges(Config) ->
|
|||
<<"status">> := <<"connected">>,
|
||||
<<"node_status">> := [_ | _],
|
||||
<<"url">> := URL1
|
||||
} = jsx:decode(Bridge),
|
||||
} = emqx_json:decode(Bridge, [return_maps]),
|
||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||
%% disable it
|
||||
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
|
||||
?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])),
|
||||
%% enable again
|
||||
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
|
||||
%% enable an already started bridge
|
||||
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
|
||||
%% disable it again
|
||||
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
|
||||
|
||||
{ok, 400, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>),
|
||||
{ok, 400, Res} = request(post, operation_path(node, start, BridgeID), <<"">>),
|
||||
?assertEqual(
|
||||
<<"{\"code\":\"BAD_REQUEST\",\"message\":\"Forbidden operation, bridge not enabled\"}">>,
|
||||
Res
|
||||
|
@ -611,7 +616,7 @@ t_enable_disable_bridges(Config) ->
|
|||
%% enable a stopped bridge
|
||||
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)),
|
||||
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])),
|
||||
%% delete the bridge
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
|
||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
|
||||
|
@ -636,7 +641,7 @@ t_reset_bridges(Config) ->
|
|||
<<"status">> := <<"connected">>,
|
||||
<<"node_status">> := [_ | _],
|
||||
<<"url">> := URL1
|
||||
} = jsx:decode(Bridge),
|
||||
} = emqx_json:decode(Bridge, [return_maps]),
|
||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||
{ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
|
||||
|
||||
|
@ -704,7 +709,7 @@ t_bridges_probe(Config) ->
|
|||
<<"code">> := <<"TEST_FAILED">>,
|
||||
<<"message">> := _
|
||||
},
|
||||
jsx:decode(NxDomain)
|
||||
emqx_json:decode(NxDomain, [return_maps])
|
||||
),
|
||||
|
||||
{ok, 204, _} = request(
|
||||
|
@ -723,7 +728,7 @@ t_bridges_probe(Config) ->
|
|||
<<"code">> := <<"TEST_FAILED">>,
|
||||
<<"message">> := <<"Connection refused">>
|
||||
},
|
||||
jsx:decode(ConnRefused)
|
||||
emqx_json:decode(ConnRefused, [return_maps])
|
||||
),
|
||||
|
||||
{ok, 400, HostNotFound} = request(
|
||||
|
@ -736,7 +741,7 @@ t_bridges_probe(Config) ->
|
|||
<<"code">> := <<"TEST_FAILED">>,
|
||||
<<"message">> := <<"Host not found">>
|
||||
},
|
||||
jsx:decode(HostNotFound)
|
||||
emqx_json:decode(HostNotFound, [return_maps])
|
||||
),
|
||||
|
||||
AuthnConfig = #{
|
||||
|
@ -767,7 +772,7 @@ t_bridges_probe(Config) ->
|
|||
<<"code">> := <<"TEST_FAILED">>,
|
||||
<<"message">> := <<"Unauthorized client">>
|
||||
},
|
||||
jsx:decode(Unauthorized)
|
||||
emqx_json:decode(Unauthorized, [return_maps])
|
||||
),
|
||||
|
||||
{ok, 400, Malformed} = request(
|
||||
|
@ -782,7 +787,7 @@ t_bridges_probe(Config) ->
|
|||
<<"code">> := <<"TEST_FAILED">>,
|
||||
<<"message">> := <<"Malformed username or password">>
|
||||
},
|
||||
jsx:decode(Malformed)
|
||||
emqx_json:decode(Malformed, [return_maps])
|
||||
),
|
||||
|
||||
{ok, 400, NotAuthorized} = request(
|
||||
|
@ -795,7 +800,7 @@ t_bridges_probe(Config) ->
|
|||
<<"code">> := <<"TEST_FAILED">>,
|
||||
<<"message">> := <<"Not authorized">>
|
||||
},
|
||||
jsx:decode(NotAuthorized)
|
||||
emqx_json:decode(NotAuthorized, [return_maps])
|
||||
),
|
||||
|
||||
{ok, 400, BadReq} = request(
|
||||
|
@ -803,7 +808,7 @@ t_bridges_probe(Config) ->
|
|||
uri(["bridges_probe"]),
|
||||
?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>)
|
||||
),
|
||||
?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, jsx:decode(BadReq)),
|
||||
?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, emqx_json:decode(BadReq, [return_maps])),
|
||||
ok.
|
||||
|
||||
t_metrics(Config) ->
|
||||
|
@ -829,7 +834,7 @@ t_metrics(Config) ->
|
|||
<<"status">> := _,
|
||||
<<"node_status">> := [_ | _],
|
||||
<<"url">> := URL1
|
||||
} = jsx:decode(Bridge),
|
||||
} = emqx_json:decode(Bridge, [return_maps]),
|
||||
|
||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||
|
||||
|
@ -840,12 +845,12 @@ t_metrics(Config) ->
|
|||
<<"metrics">> := #{<<"success">> := 0},
|
||||
<<"node_metrics">> := [_ | _]
|
||||
},
|
||||
jsx:decode(Bridge1Str)
|
||||
emqx_json:decode(Bridge1Str, [return_maps])
|
||||
),
|
||||
|
||||
%% check that the bridge doesn't contain metrics anymore
|
||||
{ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []),
|
||||
Decoded = jsx:decode(Bridge2Str),
|
||||
Decoded = emqx_json:decode(Bridge2Str, [return_maps]),
|
||||
?assertNot(maps:is_key(<<"metrics">>, Decoded)),
|
||||
?assertNot(maps:is_key(<<"node_metrics">>, Decoded)),
|
||||
|
||||
|
@ -875,7 +880,7 @@ t_metrics(Config) ->
|
|||
<<"metrics">> := #{<<"success">> := _},
|
||||
<<"node_metrics">> := [_ | _]
|
||||
},
|
||||
jsx:decode(Bridge3Str)
|
||||
emqx_json:decode(Bridge3Str, [return_maps])
|
||||
),
|
||||
|
||||
%% check for non-empty metrics when listing all bridges
|
||||
|
@ -887,7 +892,7 @@ t_metrics(Config) ->
|
|||
<<"node_metrics">> := [_ | _]
|
||||
}
|
||||
],
|
||||
jsx:decode(BridgesStr)
|
||||
emqx_json:decode(BridgesStr, [return_maps])
|
||||
),
|
||||
ok.
|
||||
|
||||
|
|
Loading…
Reference in New Issue