Image processing between two CNN
实现了一个对物体进行图像分割,提取感兴趣区域,并进行遮挡恢复和图像合并的流水线处理
- img_selected函数:
从指定文件夹中遍历所有子文件夹 对每个子文件夹中的metadata.csv文件进行解析 根据面积阈值筛选图片并保存到输出目录
- img_recovered函数:
遍历img_selected输出目录中的所有子文件夹 对每个子文件夹,读取原始图像,并用子文件夹中的遮挡图像来恢复被遮挡的区域 将恢复后的图像保存到输出目录
- img_merged函数:
遍历img_recovered目录中的所有子文件夹 对每个子文件夹,将其中的所有图像叠加合并为一张图片 将合并后的图片保存到输出目录
- main函数:
调用上述三个函数,完成图像选择、恢复和合并的完整流水线 允许通过参数配置输入输出文件夹等
import pandas as pd
import os
from PIL import Image
import cv2
import numpy as np
from rich.progress import Progress, BarColumn, SpinnerColumn, TimeElapsedColumn, TimeRemainingColumn
import time
def wu555_progress():
return Progress(
"[progress.description]{task.description}({task.completed}/{task.total})",
SpinnerColumn(finished_text="[green]✔"),
BarColumn(),
"[progress.percentage]{task.percentage:>3.2f}%",
"[yellow]⏱ ",
TimeElapsedColumn(),
"[cyan]⏳",
TimeRemainingColumn(),
"[red]Wu~"
)
def img_selected(folder_path, lower_threshold, upper_threshold, selected_output_folder):
sequence_folders = os.listdir(folder_path)
progress = wu555_progress()
task = progress.add_task("[cyan]Processing...", total=len(sequence_folders))
with progress:
for sequence_folder in sequence_folders:
progress.update(task, advance=1)
folder = os.path.join(folder_path, sequence_folder)
if not os.path.isdir(folder):
continue
csv_file = os.path.join(folder, 'metadata.csv')
df = pd.read_csv(csv_file)
output_directory = os.path.join(selected_output_folder, sequence_folder)
if not os.path.exists(output_directory):
os.makedirs(output_directory)
current_dir = os.path.dirname(os.path.abspath(csv_file))
for index, row in df.iterrows():
image_filename = str(int(row['id'])) + '.png'
area = row['area']
if lower_threshold < area < upper_threshold:
image_path = os.path.join(current_dir, image_filename)
output_path = os.path.join(output_directory, image_filename)
image = Image.open(image_path)
image.save(output_path)
# time.sleep(0.1)
# print(f'Saved image: {output_path}')
print('All selected images have been saved')
def img_recovered(img_selected_folder, recovered_output_folder, original_image_folder):
if not os.path.exists(recovered_output_folder):
os.makedirs(recovered_output_folder)
progress = wu555_progress()
task = progress.add_task("[cyan]Processing...", total=len(os.listdir(img_selected_folder)))
with progress:
for folder_name in os.listdir(img_selected_folder):
progress.update(task, advance=1)
folder_path = os.path.join(img_selected_folder, folder_name)
if not os.path.isdir(folder_path):
continue
output_subfolder = os.path.join(recovered_output_folder, folder_name)
if not os.path.exists(output_subfolder):
os.makedirs(output_subfolder)
original_image_path = os.path.join(original_image_folder, folder_name + '.jpg')
original_image = cv2.imread(original_image_path)
for filename in os.listdir(folder_path):
if filename.endswith('.png'):
masked_image_path = os.path.join(folder_path, filename)
masked_image = cv2.imread(masked_image_path)
if original_image is None:
continue
masked_gray = cv2.cvtColor(masked_image, cv2.COLOR_BGR2GRAY)
mask = np.zeros_like(original_image)
mask[masked_gray == 255] = original_image[masked_gray == 255]
restored_image = masked_image.copy()
restored_image[masked_gray == 255] = mask[masked_gray == 255]
output_path = os.path.join(output_subfolder, filename)
cv2.imwrite(output_path, restored_image)
# print(f'Saved image: {output_path}')
print('All recovered images have been saved')
def img_merged(recovered_output_folder, merged_output_folder):
subfolder_paths = [
os.path.join(recovered_output_folder, subfolder)
for subfolder in os.listdir(recovered_output_folder)
if os.path.isdir(os.path.join(recovered_output_folder, subfolder))
]
progress = wu555_progress()
task = progress.add_task("[cyan]Processing...", total=len(subfolder_paths))
with progress:
for subfolder_path in subfolder_paths:
progress.update(task, advance=1)
image_paths = [
os.path.join(subfolder_path, filename)
for filename in os.listdir(subfolder_path)
if filename.endswith(".jpg") or filename.endswith(".png")
]
first_image = cv2.imread(image_paths[0])
target_height, target_width, _ = first_image.shape
target_image = np.zeros((target_height, target_width, 3), np.uint8)
for image_path in image_paths:
image = cv2.imread(image_path)
mask = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) != 0
target_image[mask] = image[mask]
target_folder_path = merged_output_folder
os.makedirs(target_folder_path, exist_ok=True)
target_image_name = os.path.basename(subfolder_path) + ".jpg"
target_image_path = os.path.join(target_folder_path, target_image_name)
cv2.imwrite(target_image_path, target_image)
# print(f"Saved image: {target_image_path}")
print('All merged images have been saved')
def main():
folder_path = 'segment-anything/outputimg'
lower_threshold = 40000
upper_threshold = 80000
selected_output_folder = 'img_selected'
original_image_folder = 'segment-anything/inputimg'
recovered_output_folder = 'img_recovered'
merged_output_folder = "img_merged"
img_selected(folder_path, lower_threshold, upper_threshold, selected_output_folder)
img_recovered(selected_output_folder, recovered_output_folder, original_image_folder)
img_merged(recovered_output_folder, merged_output_folder)
if __name__ == '__main__':
main()