Pipelines¶
colored_icp_registration.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | import open3d as o3d import numpy as np import copy def draw_registration_result(source, target, transformation): source_temp = copy.deepcopy(source) source_temp.transform(transformation) o3d.visualization.draw([source_temp, target]) print("Load two point clouds and show initial pose ...") ply_data = o3d.data.DemoColoredICPPointClouds() source = o3d.io.read_point_cloud(ply_data.paths[0]) target = o3d.io.read_point_cloud(ply_data.paths[1]) if __name__ == "__main__": # Draw initial alignment. current_transformation = np.identity(4) # draw_registration_result(source, target, current_transformation) print(current_transformation) # Colored pointcloud registration. # This is implementation of following paper: # J. Park, Q.-Y. Zhou, V. Koltun, # Colored Point Cloud Registration Revisited, ICCV 2017. voxel_radius = [0.04, 0.02, 0.01] max_iter = [50, 30, 14] current_transformation = np.identity(4) print("Colored point cloud registration ...\n") for scale in range(3): iter = max_iter[scale] radius = voxel_radius[scale] print([iter, radius, scale]) print("1. Downsample with a voxel size %.2f" % radius) source_down = source.voxel_down_sample(radius) target_down = target.voxel_down_sample(radius) print("2. Estimate normal") source_down.estimate_normals( o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30)) target_down.estimate_normals( o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30)) print("3. Applying colored point cloud registration") result_icp = o3d.pipelines.registration.registration_colored_icp( source_down, target_down, radius, current_transformation, o3d.pipelines.registration.TransformationEstimationForColoredICP(), o3d.pipelines.registration.ICPConvergenceCriteria( relative_fitness=1e-6, relative_rmse=1e-6, max_iteration=iter)) current_transformation = result_icp.transformation print(result_icp, "\n") # draw_registration_result(source, target, result_icp.transformation) print(current_transformation) |
icp_registration.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | import open3d as o3d import numpy as np import copy def draw_registration_result(source, target, transformation): source_temp = copy.deepcopy(source) target_temp = copy.deepcopy(target) source_temp.paint_uniform_color([1, 0.706, 0]) target_temp.paint_uniform_color([0, 0.651, 0.929]) source_temp.transform(transformation) o3d.visualization.draw([source_temp, target_temp]) def point_to_point_icp(source, target, threshold, trans_init): print("Apply point-to-point ICP") reg_p2p = o3d.pipelines.registration.registration_icp( source, target, threshold, trans_init, o3d.pipelines.registration.TransformationEstimationPointToPoint()) print(reg_p2p) print("Transformation is:") print(reg_p2p.transformation, "\n") draw_registration_result(source, target, reg_p2p.transformation) def point_to_plane_icp(source, target, threshold, trans_init): print("Apply point-to-plane ICP") reg_p2l = o3d.pipelines.registration.registration_icp( source, target, threshold, trans_init, o3d.pipelines.registration.TransformationEstimationPointToPlane()) print(reg_p2l) print("Transformation is:") print(reg_p2l.transformation, "\n") draw_registration_result(source, target, reg_p2l.transformation) if __name__ == "__main__": pcd_data = o3d.data.DemoICPPointClouds() source = o3d.io.read_point_cloud(pcd_data.paths[0]) target = o3d.io.read_point_cloud(pcd_data.paths[1]) threshold = 0.02 trans_init = np.asarray([[0.862, 0.011, -0.507, 0.5], [-0.139, 0.967, -0.215, 0.7], [0.487, 0.255, 0.835, -1.4], [0.0, 0.0, 0.0, 1.0]]) draw_registration_result(source, target, trans_init) print("Initial alignment") evaluation = o3d.pipelines.registration.evaluate_registration( source, target, threshold, trans_init) print(evaluation, "\n") point_to_point_icp(source, target, threshold, trans_init) point_to_plane_icp(source, target, threshold, trans_init) |
multiway_registration.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | import open3d as o3d import numpy as np def load_point_clouds(voxel_size=0.0): pcd_data = o3d.data.DemoICPPointClouds() pcds = [] for i in range(3): pcd = o3d.io.read_point_cloud(pcd_data.paths[i]) pcd_down = pcd.voxel_down_sample(voxel_size=voxel_size) pcds.append(pcd_down) return pcds def pairwise_registration(source, target, max_correspondence_distance_coarse, max_correspondence_distance_fine): print("Apply point-to-plane ICP") icp_coarse = o3d.pipelines.registration.registration_icp( source, target, max_correspondence_distance_coarse, np.identity(4), o3d.pipelines.registration.TransformationEstimationPointToPlane()) icp_fine = o3d.pipelines.registration.registration_icp( source, target, max_correspondence_distance_fine, icp_coarse.transformation, o3d.pipelines.registration.TransformationEstimationPointToPlane()) transformation_icp = icp_fine.transformation information_icp = o3d.pipelines.registration.get_information_matrix_from_point_clouds( source, target, max_correspondence_distance_fine, icp_fine.transformation) return transformation_icp, information_icp def full_registration(pcds, max_correspondence_distance_coarse, max_correspondence_distance_fine): pose_graph = o3d.pipelines.registration.PoseGraph() odometry = np.identity(4) pose_graph.nodes.append(o3d.pipelines.registration.PoseGraphNode(odometry)) n_pcds = len(pcds) for source_id in range(n_pcds): for target_id in range(source_id + 1, n_pcds): transformation_icp, information_icp = pairwise_registration( pcds[source_id], pcds[target_id], max_correspondence_distance_coarse, max_correspondence_distance_fine) print("Build o3d.pipelines.registration.PoseGraph") if target_id == source_id + 1: # odometry case odometry = np.dot(transformation_icp, odometry) pose_graph.nodes.append( o3d.pipelines.registration.PoseGraphNode( np.linalg.inv(odometry))) pose_graph.edges.append( o3d.pipelines.registration.PoseGraphEdge(source_id, target_id, transformation_icp, information_icp, uncertain=False)) else: # loop closure case pose_graph.edges.append( o3d.pipelines.registration.PoseGraphEdge(source_id, target_id, transformation_icp, information_icp, uncertain=True)) return pose_graph if __name__ == "__main__": voxel_size = 0.02 pcds_down = load_point_clouds(voxel_size) o3d.visualization.draw(pcds_down) print("Full registration ...") max_correspondence_distance_coarse = voxel_size * 15 max_correspondence_distance_fine = voxel_size * 1.5 with o3d.utility.VerbosityContextManager( o3d.utility.VerbosityLevel.Debug) as cm: pose_graph = full_registration(pcds_down, max_correspondence_distance_coarse, max_correspondence_distance_fine) print("Optimizing PoseGraph ...") option = o3d.pipelines.registration.GlobalOptimizationOption( max_correspondence_distance=max_correspondence_distance_fine, edge_prune_threshold=0.25, reference_node=0) with o3d.utility.VerbosityContextManager( o3d.utility.VerbosityLevel.Debug) as cm: o3d.pipelines.registration.global_optimization( pose_graph, o3d.pipelines.registration.GlobalOptimizationLevenbergMarquardt(), o3d.pipelines.registration.GlobalOptimizationConvergenceCriteria(), option) print("Transform points and display") for point_id in range(len(pcds_down)): print(pose_graph.nodes[point_id].pose) pcds_down[point_id].transform(pose_graph.nodes[point_id].pose) o3d.visualization.draw(pcds_down) |
pose_graph_optimization.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | import open3d as o3d import numpy as np import os if __name__ == "__main__": o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Debug) print("") print( "Parameters for o3d.pipelines.registration.PoseGraph optimization ...") method = o3d.pipelines.registration.GlobalOptimizationLevenbergMarquardt() criteria = o3d.pipelines.registration.GlobalOptimizationConvergenceCriteria( ) option = o3d.pipelines.registration.GlobalOptimizationOption() print("") print(method) print(criteria) print(option) print("") print( "Optimizing Fragment o3d.pipelines.registration.PoseGraph using open3d ..." ) pose_graph_data = o3d.data.DemoPoseGraphOptimization() pose_graph_fragment = o3d.io.read_pose_graph( pose_graph_data.pose_graph_fragment_path) print(pose_graph_fragment) o3d.pipelines.registration.global_optimization(pose_graph_fragment, method, criteria, option) o3d.io.write_pose_graph( os.path.join('pose_graph_example_fragment_optimized.json'), pose_graph_fragment) print("") print( "Optimizing Global o3d.pipelines.registration.PoseGraph using open3d ..." ) pose_graph_global = o3d.io.read_pose_graph( pose_graph_data.pose_graph_global_path) print(pose_graph_global) o3d.pipelines.registration.global_optimization(pose_graph_global, method, criteria, option) o3d.io.write_pose_graph( os.path.join('pose_graph_example_global_optimized.json'), pose_graph_global) |
registration_fgr.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | import open3d as o3d import argparse def preprocess_point_cloud(pcd, voxel_size): pcd_down = pcd.voxel_down_sample(voxel_size) pcd_down.estimate_normals( o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2.0, max_nn=30)) pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature( pcd_down, o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5.0, max_nn=100)) return (pcd_down, pcd_fpfh) if __name__ == '__main__': pcd_data = o3d.data.DemoICPPointClouds() parser = argparse.ArgumentParser( 'Global point cloud registration example with RANSAC') parser.add_argument('src', type=str, default=pcd_data.paths[0], nargs='?', help='path to src point cloud') parser.add_argument('dst', type=str, default=pcd_data.paths[1], nargs='?', help='path to dst point cloud') parser.add_argument('--voxel_size', type=float, default=0.05, help='voxel size in meter used to downsample inputs') parser.add_argument( '--distance_multiplier', type=float, default=1.5, help='multipler used to compute distance threshold' 'between correspondences.' 'Threshold is computed by voxel_size * distance_multiplier.') parser.add_argument('--max_iterations', type=int, default=64, help='number of max FGR iterations') parser.add_argument( '--max_tuples', type=int, default=1000, help='max number of accepted tuples for correspondence filtering') args = parser.parse_args() voxel_size = args.voxel_size distance_threshold = args.distance_multiplier * voxel_size o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Debug) print('Reading inputs') src = o3d.io.read_point_cloud(args.src) dst = o3d.io.read_point_cloud(args.dst) print('Downsampling inputs') src_down, src_fpfh = preprocess_point_cloud(src, voxel_size) dst_down, dst_fpfh = preprocess_point_cloud(dst, voxel_size) print('Running FGR') result = o3d.pipelines.registration.registration_fgr_based_on_feature_matching( src_down, dst_down, src_fpfh, dst_fpfh, o3d.pipelines.registration.FastGlobalRegistrationOption( maximum_correspondence_distance=distance_threshold, iteration_number=args.max_iterations, maximum_tuple_count=args.max_tuples)) src.paint_uniform_color([1, 0, 0]) dst.paint_uniform_color([0, 1, 0]) o3d.visualization.draw([src.transform(result.transformation), dst]) |
registration_ransac.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | import open3d as o3d import argparse def preprocess_point_cloud(pcd, voxel_size): pcd_down = pcd.voxel_down_sample(voxel_size) pcd_down.estimate_normals( o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2.0, max_nn=30)) pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature( pcd_down, o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5.0, max_nn=100)) return (pcd_down, pcd_fpfh) if __name__ == '__main__': pcd_data = o3d.data.DemoICPPointClouds() parser = argparse.ArgumentParser( 'Global point cloud registration example with RANSAC') parser.add_argument('src', type=str, default=pcd_data.paths[0], nargs='?', help='path to src point cloud') parser.add_argument('dst', type=str, default=pcd_data.paths[1], nargs='?', help='path to dst point cloud') parser.add_argument('--voxel_size', type=float, default=0.05, help='voxel size in meter used to downsample inputs') parser.add_argument( '--distance_multiplier', type=float, default=1.5, help='multipler used to compute distance threshold' 'between correspondences.' 'Threshold is computed by voxel_size * distance_multiplier.') parser.add_argument('--max_iterations', type=int, default=1000000, help='number of max RANSAC iterations') parser.add_argument('--confidence', type=float, default=0.999, help='RANSAC confidence') parser.add_argument( '--mutual_filter', action='store_true', help='whether to use mutual filter for putative correspondences') args = parser.parse_args() voxel_size = args.voxel_size distance_threshold = args.distance_multiplier * voxel_size o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Debug) print('Reading inputs') src = o3d.io.read_point_cloud(args.src) dst = o3d.io.read_point_cloud(args.dst) print('Downsampling inputs') src_down, src_fpfh = preprocess_point_cloud(src, voxel_size) dst_down, dst_fpfh = preprocess_point_cloud(dst, voxel_size) print('Running RANSAC') result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching( src_down, dst_down, src_fpfh, dst_fpfh, mutual_filter=args.mutual_filter, max_correspondence_distance=distance_threshold, estimation_method=o3d.pipelines.registration. TransformationEstimationPointToPoint(False), ransac_n=3, checkers=[ o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength( 0.9), o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance( distance_threshold) ], criteria=o3d.pipelines.registration.RANSACConvergenceCriteria( args.max_iterations, args.confidence)) src.paint_uniform_color([1, 0, 0]) dst.paint_uniform_color([0, 1, 0]) o3d.visualization.draw([src.transform(result.transformation), dst]) |
rgbd_integration_uniform.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | import open3d as o3d import numpy as np import os, sys pyexample_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(pyexample_path) from open3d_example import read_trajectory if __name__ == "__main__": rgbd_data = o3d.data.SampleRedwoodRGBDImages() camera_poses = read_trajectory(rgbd_data.odometry_log_path) camera_intrinsics = o3d.camera.PinholeCameraIntrinsic( o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault) volume = o3d.pipelines.integration.UniformTSDFVolume( length=4.0, resolution=512, sdf_trunc=0.04, color_type=o3d.pipelines.integration.TSDFVolumeColorType.RGB8, ) for i in range(len(camera_poses)): print("Integrate {:d}-th image into the volume.".format(i)) color = o3d.io.read_image(rgbd_data.color_paths[i]) depth = o3d.io.read_image(rgbd_data.depth_paths[i]) rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth( color, depth, depth_trunc=4.0, convert_rgb_to_intensity=False) volume.integrate( rgbd, camera_intrinsics, np.linalg.inv(camera_poses[i].pose), ) print("Extract triangle mesh") mesh = volume.extract_triangle_mesh() mesh.compute_vertex_normals() o3d.visualization.draw_geometries([mesh]) print("Extract voxel-aligned debugging point cloud") voxel_pcd = volume.extract_voxel_point_cloud() o3d.visualization.draw_geometries([voxel_pcd]) print("Extract voxel-aligned debugging voxel grid") voxel_grid = volume.extract_voxel_grid() # o3d.visualization.draw_geometries([voxel_grid]) # print("Extract point cloud") # pcd = volume.extract_point_cloud() # o3d.visualization.draw_geometries([pcd]) |
rgbd_odometry.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | import open3d as o3d import numpy as np if __name__ == "__main__": pinhole_camera_intrinsic = o3d.camera.PinholeCameraIntrinsic( o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault) rgbd_data = o3d.data.SampleRedwoodRGBDImages() source_color = o3d.io.read_image(rgbd_data.color_paths[0]) source_depth = o3d.io.read_image(rgbd_data.depth_paths[0]) target_color = o3d.io.read_image(rgbd_data.color_paths[1]) target_depth = o3d.io.read_image(rgbd_data.depth_paths[1]) source_rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( source_color, source_depth) target_rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( target_color, target_depth) target_pcd = o3d.geometry.PointCloud.create_from_rgbd_image( target_rgbd_image, pinhole_camera_intrinsic) option = o3d.pipelines.odometry.OdometryOption() odo_init = np.identity(4) print(option) [success_color_term, trans_color_term, info] = o3d.pipelines.odometry.compute_rgbd_odometry( source_rgbd_image, target_rgbd_image, pinhole_camera_intrinsic, odo_init, o3d.pipelines.odometry.RGBDOdometryJacobianFromColorTerm(), option) [success_hybrid_term, trans_hybrid_term, info] = o3d.pipelines.odometry.compute_rgbd_odometry( source_rgbd_image, target_rgbd_image, pinhole_camera_intrinsic, odo_init, o3d.pipelines.odometry.RGBDOdometryJacobianFromHybridTerm(), option) if success_color_term: print("Using RGB-D Odometry") print(trans_color_term) source_pcd_color_term = o3d.geometry.PointCloud.create_from_rgbd_image( source_rgbd_image, pinhole_camera_intrinsic) source_pcd_color_term.transform(trans_color_term) o3d.visualization.draw([target_pcd, source_pcd_color_term]) if success_hybrid_term: print("Using Hybrid RGB-D Odometry") print(trans_hybrid_term) source_pcd_hybrid_term = o3d.geometry.PointCloud.create_from_rgbd_image( source_rgbd_image, pinhole_camera_intrinsic) source_pcd_hybrid_term.transform(trans_hybrid_term) o3d.visualization.draw([target_pcd, source_pcd_hybrid_term]) |
robust_icp.py¶
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | import open3d as o3d import numpy as np import copy def draw_registration_result(source, target, transformation): source_temp = copy.deepcopy(source) target_temp = copy.deepcopy(target) source_temp.paint_uniform_color([1, 0.706, 0]) target_temp.paint_uniform_color([0, 0.651, 0.929]) source_temp.transform(transformation) o3d.visualization.draw([source_temp, target_temp]) def apply_noise(pcd, mu, sigma): noisy_pcd = copy.deepcopy(pcd) points = np.asarray(noisy_pcd.points) points += np.random.normal(mu, sigma, size=points.shape) noisy_pcd.points = o3d.utility.Vector3dVector(points) return noisy_pcd if __name__ == "__main__": pcd_data = o3d.data.DemoICPPointClouds() source = o3d.io.read_point_cloud(pcd_data.paths[0]) target = o3d.io.read_point_cloud(pcd_data.paths[1]) trans_init = np.asarray([[0.862, 0.011, -0.507, 0.5], [-0.139, 0.967, -0.215, 0.7], [0.487, 0.255, 0.835, -1.4], [0.0, 0.0, 0.0, 1.0]]) # Mean and standard deviation. mu, sigma = 0, 0.1 source_noisy = apply_noise(source, mu, sigma) print("Displaying source point cloud + noise:") o3d.visualization.draw([source_noisy]) print( "Displaying original source and target point cloud with initial transformation:" ) draw_registration_result(source, target, trans_init) threshold = 1.0 print("Using the noisy source pointcloud to perform robust ICP.\n") print("Robust point-to-plane ICP, threshold={}:".format(threshold)) loss = o3d.pipelines.registration.TukeyLoss(k=sigma) print("Using robust loss:", loss) p2l = o3d.pipelines.registration.TransformationEstimationPointToPlane(loss) reg_p2l = o3d.pipelines.registration.registration_icp( source_noisy, target, threshold, trans_init, p2l) print(reg_p2l) print("Transformation is:") print(reg_p2l.transformation) draw_registration_result(source, target, reg_p2l.transformation) |