Skip to content

Commit

Permalink
gen_cfeatures.py worked with GPU. with normal GPU, throuput was about…
Browse files Browse the repository at this point in the history
… 0.09sec/file.
  • Loading branch information
ryogrid committed Nov 7, 2024
1 parent 1da46d1 commit f3acb77
Showing 1 changed file with 82 additions and 120 deletions.
202 changes: 82 additions & 120 deletions gen_cfeatures.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
import datetime
import os, time

import pandas as pd
import argparse
import traceback, sys
import re
from pathlib import Path
from typing import List, Tuple, Dict, Any, Optional, Callable, Protocol

import numpy as np
from numpy import signedinteger
from PIL import Image
from huggingface_hub import hf_hub_download
from huggingface_hub.utils import HfHubHTTPError
import concurrent.futures

import json
import os.path
from functools import lru_cache
from typing import Union, List
from typing import Union, List, Optional

import numpy as np
from PIL import Image
from huggingface_hub import hf_hub_download, HfFileSystem

from imgutils.data import MultiImagesTyping, load_images, ImageTyping
from imgutils.utils import open_onnx_model
from onnxruntime import InferenceSession

try:
from typing import Literal
Expand All @@ -34,7 +26,6 @@

hf_fs = HfFileSystem()


_VALID_MODEL_NAMES = [
os.path.basename(os.path.dirname(file)) for file in
hf_fs.glob('deepghs/ccip_onnx/*/model.ckpt')
Expand All @@ -44,8 +35,8 @@

EXTENSIONS: List[str] = ['.png', '.jpg', '.jpeg', ".PNG", ".JPG", ".JPEG"]

BATCH_SIZE: int = 10 # max size for M1 MBA GPU
PROGRESS_INTERVAL: int = 1000
BATCH_SIZE: int = 20
PROGRESS_INTERVAL: int = 100

WORKER_NUM: int = 8

Expand All @@ -64,14 +55,9 @@ def print_traceback() -> None:

class Predictor:
def __init__(self) -> None:
pass
# self.last_loaded_repo: Optional[str] = None
self.embed_model: Optional[InferenceSession] = None
# self.tagger_model: Optional[nn.Module] = None
# self.tag_names: Optional[List[str]] = None
# self.rating_index: Optional[List[int]] = None
# self.general_index: Optional[List[int]] = None
# self.character_index: Optional[List[int]] = None
# self.transform: Optional[Callable] = None


def list_files_recursive(self, dir_path: str) -> List[str]:
file_list: List[str] = []
Expand Down Expand Up @@ -104,42 +90,8 @@ def list_files_recursive(self, dir_path: str) -> List[str]:
#
# return padded_image

# def load_model(self) -> None:
# if self.tagger_model is not None:
# return
#
# self.tagger_model = timm.create_model("hf-hub:" + TAGGER_VIT_MODEL_REPO).eval()
# state_dict = timm.models.load_state_dict_from_hf(TAGGER_VIT_MODEL_REPO)
# self.tagger_model.load_state_dict(state_dict)
#
# print("Loading tag list...")
# self.load_labels_hf(repo_id=TAGGER_VIT_MODEL_REPO)
#
# print("Creating data transform...")
# self.transform = create_transform(**resolve_data_config(self.tagger_model.pretrained_cfg, model=self.tagger_model))

def write_to_file(self, csv_line: str) -> None:
self.f.write(csv_line + '\n')

# def gen_image_tensor(self, file_path: str) -> Tensor | None:
# img: Image.Image = None
# try:
# img = Image.open(file_path)
# img.load()
# img_tmp = self.prepare_image(img)
# # run the model's input transform to convert to tensor and rescale
# input: Tensor = self.transform(img_tmp)
# # NCHW image RGB to BGR
# input = input[[2, 1, 0]]
# return input
# except Exception as e:
# if img is not None:
# img.close()
# error_class: type = type(e)
# error_description: str = str(e)
# err_msg: str = '%s: %s' % (error_class, error_description)
# print(err_msg)
# return None
# def write_to_file(self, csv_line: str) -> None:
# self.f.write(csv_line + '\n')

def filter_files_by_date(self, file_list: List[str], added_date: datetime.date) -> List[str]:
filtered_list: List[str] = []
Expand All @@ -163,31 +115,35 @@ def _preprocess_image(self, image: Image.Image, size: int = 384):

return data

@lru_cache()
def _open_feat_model(self, model):
return open_onnx_model(hf_hub_download(
f'deepghs/ccip_onnx',
f'{model}/model_feat.onnx',
))

@lru_cache()
def _open_metric_model(self, model):
#@lru_cache()
def _open_feat_model(self, model) -> InferenceSession:
return open_onnx_model(hf_hub_download(
f'deepghs/ccip_onnx',
f'{model}/model_metrics.onnx',
))

@lru_cache()
def _open_metrics(self, model):
with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/metrics.json'), 'r') as f:
return json.load(f)

@lru_cache()
def _open_cluster_metrics(self, model):
with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/cluster.json'), 'r') as f:
return json.load(f)

def ccip_batch_extract_features(self, images: MultiImagesTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES):
f'deepghs/ccip_onnx',
f'{model}/model_feat.onnx',
),
mode = 'CUDAExecutionProvider',
)

# @lru_cache()
# def _open_metric_model(self, model):
# return open_onnx_model(hf_hub_download(
# f'deepghs/ccip_onnx',
# f'{model}/model_metrics.onnx',
# ))
#
# @lru_cache()
# def _open_metrics(self, model):
# with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/metrics.json'), 'r') as f:
# return json.load(f)
#
# @lru_cache()
# def _open_cluster_metrics(self, model):
# with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/cluster.json'), 'r') as f:
# return json.load(f)

#def ccip_batch_extract_features(self, images: MultiImagesTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES):
def ccip_batch_extract_features(self, images: List[np.ndarray], size: int = 384,
model: str = _DEFAULT_MODEL_NAMES) -> np.ndarray:
"""
Extracts the feature vectors of multiple images using the specified model.
:param images: The input images from which to extract the feature vectors.
Expand All @@ -207,9 +163,11 @@ def ccip_batch_extract_features(self, images: MultiImagesTyping, size: int = 384
>>> feat.shape, feat.dtype
((3, 768), dtype('float32'))
"""
images = load_images(images, mode='RGB')
data = np.stack([self._preprocess_image(item, size=size) for item in images]).astype(np.float32)
output, = self._open_feat_model(model).run(['output'], {'input': data})
# images = load_images(images, mode='RGB')
# data = np.stack([self._preprocess_image(item, size=size) for item in images]).astype(np.float32)
data = np.stack(images).astype(np.float32)
# output, = self._open_feat_model(model).run(['output'], {'input': data})
output, = self.embed_model.run(['output'], {'input': data})
return output

def ccip_extract_feature(self, image: ImageTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES):
Expand All @@ -234,19 +192,22 @@ def ccip_extract_feature(self, image: ImageTyping, size: int = 384, model: str =
"""
return self.ccip_batch_extract_features([image], size, model)[0]

_FeatureOrImage = Union[ImageTyping, np.ndarray]
# _FeatureOrImage = Union[ImageTyping, np.ndarray]

def _p_feature(self, x: _FeatureOrImage, size: int = 384, model: str = _DEFAULT_MODEL_NAMES):
if isinstance(x, np.ndarray): # if feature
return x
else: # is image or path
return self.ccip_extract_feature(x, size, model)
# def _p_feature(self, x: _FeatureOrImage, size: int = 384, model: str = _DEFAULT_MODEL_NAMES):
# if isinstance(x, np.ndarray): # if feature
# return x
# else: # is image or path
# return self.ccip_extract_feature(x, size, model)

def predict(
self,
tensors: List[np.ndarray],
) -> List[str]:
pass
images: List[np.ndarray],
) -> np.ndarray:
print("Running inference...")
ret = self.ccip_batch_extract_features(images)
print("Processing results...")
return ret
# batched_tensor = torch.stack(tensors, dim=0)
#
# print("Running inference...")
Expand All @@ -267,28 +228,26 @@ def predict(
# print("Processing results...")
# preds = outputs.numpy()

def gen_image_ndarray(self, file_path) -> np.ndarray | None:
try:
img: Image.Image = load_images([file_path], mode='RGB')[0]
ret_arr: np.ndarray = self._preprocess_image(img, size=384)
return ret_arr
except Exception as e:
error_class: type = type(e)
error_description: str = str(e)
err_msg: str = '%s: %s' % (error_class, error_description)
print(err_msg)
return None

def process_directory(self, dir_path: str, added_date: datetime.date | None = None) -> None:
file_list: List[str] = self.list_files_recursive(dir_path)
print(f'{len(file_list)} files found')

# tag new images after specified date
if added_date is not None:
file_list = self.filter_files_by_date(file_list, added_date)
print(f'{len(file_list)} files found after {added_date}')
# backup tags-wd-tagger.txt with copying to tags-wd-tagger.txt.bak
if os.path.exists('tags-wd-tagger.txt'):
with open('tags-wd-tagger.txt', 'r', encoding='utf-8') as f:
with open('tags-wd-tagger.txt.bak', 'w', encoding='utf-8') as f_bak:
f_bak.write(f.read())
else:
print('tags-wd-tagger.txt not found')
exit(1)

self.f = open('tags-wd-tagger.txt', 'a', encoding='utf-8')

self.load_model()
# self.load_model()
self.embed_model = self._open_feat_model(_DEFAULT_MODEL_NAMES)

tensors: List[Tensor] = []
ndarrs: List[np.ndarray] = []
fpathes: List[str] = []
start: float = time.perf_counter()
last_cnt: int = 0
Expand All @@ -297,37 +256,40 @@ def process_directory(self, dir_path: str, added_date: datetime.date | None = No
passed_idx: int = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKER_NUM) as executor:
# dispatch get Tensor task to processes
future_to_path = {executor.submit(self.gen_image_tensor, file_path): file_path for file_path in
future_to_path = {executor.submit(self.gen_image_ndarray, file_path): file_path for file_path in
file_list[0: BATCH_SIZE]}
passed_idx += BATCH_SIZE
while passed_idx < len(file_list):
for future in concurrent.futures.as_completed(future_to_path):
path = future_to_path[future]
try:
tensor = future.result()
if tensor is None:
ndarr = future.result()
if ndarr is None:
failed_cnt += 1
cnt -= 1
# continue

if tensor is not None:
tensors.append(tensor)
if ndarr is not None:
ndarrs.append(ndarr)
fpathes.append(path)

if len(tensors) >= BATCH_SIZE - failed_cnt:
if len(ndarrs) >= BATCH_SIZE - failed_cnt:
# submit load Tensor tasks for next batch
end_idx = passed_idx + BATCH_SIZE
if end_idx > len(file_list):
end_idx = len(file_list)
future_to_path = {executor.submit(self.gen_image_tensor, file_path): file_path for file_path
future_to_path = {executor.submit(self.gen_image_ndarray, file_path): file_path for file_path
in file_list[passed_idx: end_idx]}
passed_idx = end_idx

# run inference
results_in_csv_format: List[str] = self.predict(tensors)
for idx, line in enumerate(results_in_csv_format):
self.write_to_file(fpathes[idx] + ',' + line)
tensors = []
# dimension of results: (batch_size, 768)
results: np.ndarray = self.predict(ndarrs)
# for idx, line in enumerate(results_in_csv_format):
# self.write_to_file(fpathes[idx] + ',' + line)
# for arr in results:
# print(arr.astype(float))
ndarrs = []
fpathes = []
failed_cnt = 0

Expand Down

0 comments on commit f3acb77

Please sign in to comment.