add 原版代码.

This commit is contained in:
Licsber 2022-03-02 19:32:08 +08:00
parent ba030ca198
commit 8536ade252
58 changed files with 4222 additions and 0 deletions

6
sh/activate.sh Normal file
View File

@ -0,0 +1,6 @@
export PYTHONPATH=/home/licsber/services/gxs/src
PY=/home/licsber/anaconda3/envs/gxs-36/bin/python
hostname
echo $PYTHONPATH
echo $PY

11
sh/deploy.sh Normal file
View File

@ -0,0 +1,11 @@
#!/usr/bin/env zsh
SRC=/Users/licsber/Coding/Python/2021工训赛/
DST=192.168.1.102:/home/licsber/gx/
rsync -rtvzhP $SRC $DST --delete-after --exclude "venv/" --exclude "__pycache__/" --exclude "*.onnx" --exclude "*.engine" --exclude ".git/"
SRC=/Users/licsber/datasets/工训赛/models/
cd "$SRC" || exit
rsync -rtvzhP ssd-mobilenet.onnx $DST
rsync -rtvzhP labels.txt $DST

0
sh/run.sh Normal file
View File

17
sh/ser.sh Normal file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env zsh
SRC=/Users/licsber/Coding/Python/2021工训赛/
DST=ser:/home/licsber/services/gxs/
rsync -rtvzhP $SRC $DST --delete-after --exclude "venv/" --exclude "__pycache__/"
SRC=/Users/licsber/datasets/工训赛/models/
DST=ser:/datasets/工训赛/models
cd "$SRC" || exit
rsync -tvzhP labels.txt $DST
rsync -rtvzhP $SRC/../voc/ $DST/../voc --delete-after
rsync -tvzhP mobilenet-v1-ssd-mp-0_675.pth $DST
rsync -tvzhP ser:/datasets/工训赛/models/mb1-ssd-Epoch-60-Loss-1.0784624990294962.pth /Users/licsber/datasets/工训赛/models/
#rsync -tvzhP ssd-mobilenet.onnx $DST
#rsync -tvzhP mb1-ssd-Epoch-28-Loss-1.1538286421980177.pth $DST

3
sh/train.sh Normal file
View File

@ -0,0 +1,3 @@
source activate.sh
$PY "$PYTHONPATH/train.py" "$@"

15
src/0_extract_video.py Normal file
View File

@ -0,0 +1,15 @@
import cv2
from config import VIDEO_PATH, IMG_PATH
count = 0
for avi in VIDEO_PATH.glob('*.avi'):
cap = cv2.VideoCapture(str(avi))
while True:
suc, bgr = cap.read()
if not suc:
break
save_name = IMG_PATH / f"{count}.jpg"
cv2.imwrite(str(save_name), bgr)
count += 1

8
src/1_rename_img.py Normal file
View File

@ -0,0 +1,8 @@
from config import IMG_PATH
count = 0
all_files = list(IMG_PATH.glob('*.jpg'))
all_files.sort()
for img in all_files:
img.rename(img.parent / f"{count:06d}.jpg")
count += 1

48
src/2_make_voc.py Normal file
View File

@ -0,0 +1,48 @@
import random
from config import IMG_PATH, VOC_PATH, CLASSES
random.seed(233)
annos = VOC_PATH / 'Annotations'
datasets = VOC_PATH / 'ImageSets' / 'Main'
images = VOC_PATH / 'JPEGImages'
annos.mkdir(exist_ok=True)
datasets.mkdir(parents=True, exist_ok=True)
images.mkdir(exist_ok=True)
for img in IMG_PATH.glob('*.jpg'):
img.rename(images / img.name)
for anno in IMG_PATH.glob('*.xml'):
anno.rename(annos / anno.name)
labels = VOC_PATH / 'labels.txt'
labels.write_text('\n'.join(CLASSES))
train_file = datasets / 'train.txt'
val_file = datasets / 'val.txt'
train_val_file = datasets / 'trainval.txt'
test_file = datasets / 'test.txt'
train_ratio = 0.7
val_ratio = 0.1
total = list(annos.glob('*.xml'))
random.shuffle(total)
total_nums = len(total)
train_num = int(total_nums * train_ratio)
val_num = int(total_nums * val_ratio)
train = total[:train_num]
val = total[train_num:train_num + val_num]
test = total[train_num + val_num:]
train = '\n'.join([i.name.rstrip('.xml') for i in train])
val = '\n'.join([i.name.rstrip('.xml') for i in val])
test = '\n'.join([i.name.rstrip('.xml') for i in test])
train_file.write_text(train)
val_file.write_text(val)
test_file.write_text(test)
train_val_file.write_text(train + '\n' + val)

322
src/3_train_ssd.py Normal file
View File

@ -0,0 +1,322 @@
import argparse
import itertools
import logging
import os
import sys
import torch
from torch.optim.lr_scheduler import CosineAnnealingLR, MultiStepLR
from torch.utils.data import DataLoader, ConcatDataset
from config import VOC_PATH, MODEL_PATH
from vision.datasets.voc_dataset import VOCDataset
from vision.nn.multibox_loss import MultiboxLoss
from vision.ssd.config import mobilenetv1_ssd_config
from vision.ssd.config import squeezenet_ssd_config
from vision.ssd.config import vgg_ssd_config
from vision.ssd.data_preprocessing import TrainAugmentation, TestTransform
from vision.ssd.mobilenet_v2_ssd_lite import create_mobilenetv2_ssd_lite
from vision.ssd.mobilenetv1_ssd import create_mobilenetv1_ssd
from vision.ssd.mobilenetv1_ssd_lite import create_mobilenetv1_ssd_lite
from vision.ssd.squeezenet_ssd_lite import create_squeezenet_ssd_lite
from vision.ssd.ssd import MatchPrior
from vision.ssd.vgg_ssd import create_vgg_ssd
from vision.utils.misc import str2bool, Timer, freeze_net_layers, store_labels
parser = argparse.ArgumentParser(
description='Single Shot MultiBox Detector Training With PyTorch')
parser.add_argument("--dataset-type", default="voc", type=str,
help='Specify dataset type. Currently supports voc and open_images.')
parser.add_argument('--datasets', '--data', nargs='+', default=[str(VOC_PATH)], help='Dataset directory path')
parser.add_argument('--balance-data', action='store_true',
help="Balance training data by down-sampling more frequent labels.")
parser.add_argument('--net', default="mb1-ssd",
help="The network architecture, it can be mb1-ssd, mb1-lite-ssd, mb2-ssd-lite or vgg16-ssd.")
parser.add_argument('--freeze-base-net', action='store_true',
help="Freeze base net layers.")
parser.add_argument('--freeze-net', action='store_true',
help="Freeze all the layers except the prediction head.")
parser.add_argument('--mb2-width-mult', default=1.0, type=float,
help='Width Multiplifier for MobilenetV2')
# Params for loading pretrained basenet or checkpoints.
parser.add_argument('--base-net', help='Pretrained base model')
parser.add_argument('--pretrained-ssd', default=str(MODEL_PATH) + '/mobilenet-v1-ssd-mp-0_675.pth', type=str,
help='Pre-trained base model')
parser.add_argument('--resume', default=None, type=str,
help='Checkpoint state_dict file to resume training from')
# Params for SGD
parser.add_argument('--lr', '--learning-rate', default=0.01, type=float,
help='initial learning rate')
parser.add_argument('--momentum', default=0.9, type=float,
help='Momentum value for optim')
parser.add_argument('--weight-decay', default=5e-4, type=float,
help='Weight decay for SGD')
parser.add_argument('--gamma', default=0.1, type=float,
help='Gamma update for SGD')
parser.add_argument('--base-net-lr', default=0.001, type=float,
help='initial learning rate for base net, or None to use --lr')
parser.add_argument('--extra-layers-lr', default=None, type=float,
help='initial learning rate for the layers not in base net and prediction heads.')
# Scheduler
parser.add_argument('--scheduler', default="cosine", type=str,
help="Scheduler for SGD. It can one of multi-step and cosine")
# Params for Multi-step Scheduler
parser.add_argument('--milestones', default="80,100", type=str,
help="milestones for MultiStepLR")
# Params for Cosine Annealing
parser.add_argument('--t-max', default=100, type=float,
help='T_max value for Cosine Annealing Scheduler.')
# Train params
parser.add_argument('--batch-size', default=16, type=int,
help='Batch size for training')
parser.add_argument('--num-epochs', '--epochs', default=100, type=int,
help='the number epochs')
parser.add_argument('--num-workers', '--workers', default=0, type=int,
help='Number of workers used in dataloading')
parser.add_argument('--validation-epochs', default=1, type=int,
help='the number epochs between running validation')
parser.add_argument('--debug-steps', default=10, type=int,
help='Set the debug log output frequency.')
parser.add_argument('--use-cuda', default=True, type=str2bool,
help='Use CUDA to train model')
parser.add_argument('--checkpoint-folder', '--model-dir', default=str(MODEL_PATH),
help='Directory for saving checkpoint models')
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(asctime)s - %(message)s', datefmt="%Y-%m-%d %H:%M:%S")
args = parser.parse_args()
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() and args.use_cuda else "cpu")
if args.use_cuda and torch.cuda.is_available():
torch.backends.cudnn.benchmark = True
logging.info("Using CUDA...")
def train(loader, net, criterion, optimizer, device, debug_steps=100, epoch=-1):
net.train(True)
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
for i, data in enumerate(loader):
images, boxes, labels = data
images = images.to(device)
boxes = boxes.to(device)
labels = labels.to(device)
optimizer.zero_grad()
confidence, locations = net(images)
regression_loss, classification_loss = criterion(confidence, locations, labels, boxes)
loss = regression_loss + classification_loss
loss.backward()
optimizer.step()
running_loss += loss.item()
running_regression_loss += regression_loss.item()
running_classification_loss += classification_loss.item()
if i and i % debug_steps == 0:
avg_loss = running_loss / debug_steps
avg_reg_loss = running_regression_loss / debug_steps
avg_clf_loss = running_classification_loss / debug_steps
logging.info(
f"Epoch: {epoch}, Step: {i}/{len(loader)}, " +
f"Avg Loss: {avg_loss:.4f}, " +
f"Avg Regression Loss {avg_reg_loss:.4f}, " +
f"Avg Classification Loss: {avg_clf_loss:.4f}"
)
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
def test(loader, net, criterion, device):
net.eval()
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
num = 0
for _, data in enumerate(loader):
images, boxes, labels = data
images = images.to(device)
boxes = boxes.to(device)
labels = labels.to(device)
num += 1
with torch.no_grad():
confidence, locations = net(images)
regression_loss, classification_loss = criterion(confidence, locations, labels, boxes)
loss = regression_loss + classification_loss
running_loss += loss.item()
running_regression_loss += regression_loss.item()
running_classification_loss += classification_loss.item()
return running_loss / num, running_regression_loss / num, running_classification_loss / num
if __name__ == '__main__':
timer = Timer()
logging.info(args)
if args.checkpoint_folder:
args.checkpoint_folder = os.path.expanduser(args.checkpoint_folder)
if not os.path.exists(args.checkpoint_folder):
os.mkdir(args.checkpoint_folder)
if args.net == 'vgg16-ssd':
create_net = create_vgg_ssd
config = vgg_ssd_config
elif args.net == 'mb1-ssd':
create_net = create_mobilenetv1_ssd
config = mobilenetv1_ssd_config
elif args.net == 'mb1-ssd-lite':
create_net = create_mobilenetv1_ssd_lite
config = mobilenetv1_ssd_config
elif args.net == 'sq-ssd-lite':
create_net = create_squeezenet_ssd_lite
config = squeezenet_ssd_config
elif args.net == 'mb2-ssd-lite':
create_net = lambda num: create_mobilenetv2_ssd_lite(num, width_mult=args.mb2_width_mult)
config = mobilenetv1_ssd_config
else:
logging.fatal("The net type is wrong.")
parser.print_help(sys.stderr)
sys.exit(1)
train_transform = TrainAugmentation(config.image_size, config.image_mean, config.image_std)
target_transform = MatchPrior(config.priors, config.center_variance,
config.size_variance, 0.5)
test_transform = TestTransform(config.image_size, config.image_mean, config.image_std)
logging.info("Prepare training datasets.")
datasets = []
for dataset_path in args.datasets:
dataset = VOCDataset(dataset_path, transform=train_transform,
target_transform=target_transform)
label_file = os.path.join(args.checkpoint_folder, "labels.txt")
store_labels(label_file, dataset.class_names)
num_classes = len(dataset.class_names)
datasets.append(dataset)
logging.info(f"Stored labels into file {label_file}.")
train_dataset = ConcatDataset(datasets)
logging.info("Train dataset size: {}".format(len(train_dataset)))
train_loader = DataLoader(train_dataset, args.batch_size,
num_workers=args.num_workers,
shuffle=True)
logging.info("Prepare Validation datasets.")
val_dataset = VOCDataset(dataset_path, transform=test_transform,
target_transform=target_transform, is_test=True)
logging.info("Validation dataset size: {}".format(len(val_dataset)))
val_loader = DataLoader(val_dataset, args.batch_size,
num_workers=args.num_workers,
shuffle=False)
logging.info("Build network.")
net = create_net(num_classes)
min_loss = -10000.0
last_epoch = -1
base_net_lr = args.base_net_lr if args.base_net_lr is not None else args.lr
extra_layers_lr = args.extra_layers_lr if args.extra_layers_lr is not None else args.lr
if args.freeze_base_net:
logging.info("Freeze base net.")
freeze_net_layers(net.base_net)
params = itertools.chain(net.source_layer_add_ons.parameters(), net.extras.parameters(),
net.regression_headers.parameters(), net.classification_headers.parameters())
params = [
{'params': itertools.chain(
net.source_layer_add_ons.parameters(),
net.extras.parameters()
), 'lr': extra_layers_lr},
{'params': itertools.chain(
net.regression_headers.parameters(),
net.classification_headers.parameters()
)}
]
elif args.freeze_net:
freeze_net_layers(net.base_net)
freeze_net_layers(net.source_layer_add_ons)
freeze_net_layers(net.extras)
params = itertools.chain(net.regression_headers.parameters(), net.classification_headers.parameters())
logging.info("Freeze all the layers except prediction heads.")
else:
params = [
{'params': net.base_net.parameters(), 'lr': base_net_lr},
{'params': itertools.chain(
net.source_layer_add_ons.parameters(),
net.extras.parameters()
), 'lr': extra_layers_lr},
{'params': itertools.chain(
net.regression_headers.parameters(),
net.classification_headers.parameters()
)}
]
# load a previous model checkpoint (if requested)
timer.start("Load Model")
if args.resume:
logging.info(f"Resume from the model {args.resume}")
net.load(args.resume)
elif args.base_net:
logging.info(f"Init from base net {args.base_net}")
net.init_from_base_net(args.base_net)
elif args.pretrained_ssd:
logging.info(f"Init from pretrained ssd {args.pretrained_ssd}")
net.init_from_pretrained_ssd(args.pretrained_ssd)
logging.info(f'Took {timer.end("Load Model"):.2f} seconds to load the model.')
net.to(DEVICE)
criterion = MultiboxLoss(config.priors, iou_threshold=0.5, neg_pos_ratio=3,
center_variance=0.1, size_variance=0.2, device=DEVICE)
optimizer = torch.optim.SGD(params, lr=args.lr, momentum=args.momentum,
weight_decay=args.weight_decay)
logging.info(f"Learning rate: {args.lr}, Base net learning rate: {base_net_lr}, "
+ f"Extra Layers learning rate: {extra_layers_lr}.")
if args.scheduler == 'multi-step':
logging.info("Uses MultiStepLR scheduler.")
milestones = [int(v.strip()) for v in args.milestones.split(",")]
scheduler = MultiStepLR(optimizer, milestones=milestones,
gamma=0.1, last_epoch=last_epoch)
elif args.scheduler == 'cosine':
logging.info("Uses CosineAnnealingLR scheduler.")
scheduler = CosineAnnealingLR(optimizer, args.t_max, last_epoch=last_epoch)
else:
logging.fatal(f"Unsupported Scheduler: {args.scheduler}.")
parser.print_help(sys.stderr)
sys.exit(1)
logging.info(f"Start training from epoch {last_epoch + 1}.")
for epoch in range(last_epoch + 1, args.num_epochs):
train(train_loader, net, criterion, optimizer,
device=DEVICE, debug_steps=args.debug_steps, epoch=epoch)
scheduler.step()
if epoch % args.validation_epochs == 0 or epoch == args.num_epochs - 1:
val_loss, val_regression_loss, val_classification_loss = test(val_loader, net, criterion, DEVICE)
logging.info(
f"Epoch: {epoch}, " +
f"Validation Loss: {val_loss:.4f}, " +
f"Validation Regression Loss {val_regression_loss:.4f}, " +
f"Validation Classification Loss: {val_classification_loss:.4f}"
)
model_path = os.path.join(args.checkpoint_folder, f"{args.net}-Epoch-{epoch}-Loss-{val_loss}.pth")
net.save(model_path)
logging.info(f"Saved model {model_path}")
logging.info("Task done, exiting program.")

219
src/4_eval_ssd.py Normal file
View File

@ -0,0 +1,219 @@
import argparse
import logging
import pathlib
import sys
import numpy as np
import torch
from config import MODEL_PATH, VOC_PATH, MODEL_NAME
from vision.datasets.open_images import OpenImagesDataset
from vision.datasets.voc_dataset import VOCDataset
from vision.ssd.mobilenet_v2_ssd_lite import create_mobilenetv2_ssd_lite, create_mobilenetv2_ssd_lite_predictor
from vision.ssd.mobilenetv1_ssd import create_mobilenetv1_ssd, create_mobilenetv1_ssd_predictor
from vision.ssd.mobilenetv1_ssd_lite import create_mobilenetv1_ssd_lite, create_mobilenetv1_ssd_lite_predictor
from vision.ssd.squeezenet_ssd_lite import create_squeezenet_ssd_lite, create_squeezenet_ssd_lite_predictor
from vision.ssd.vgg_ssd import create_vgg_ssd, create_vgg_ssd_predictor
from vision.utils import box_utils, measurements
from vision.utils import str2bool, Timer
parser = argparse.ArgumentParser(description="SSD Evaluation on VOC Dataset.")
parser.add_argument('--net', default='mb1-ssd',
help="The network architecture, it should be of mb1-ssd, mb1-ssd-lite, mb2-ssd-lite or vgg16-ssd.")
parser.add_argument("--trained_model", type=str,
default='/Users/licsber/datasets/工训赛/models/' + MODEL_NAME)
parser.add_argument("--dataset_type", default="voc", type=str,
help='Specify dataset type. Currently support voc and open_images.')
parser.add_argument("--dataset", type=str, help="The root directory of the VOC dataset or Open Images dataset.",
default=str(VOC_PATH))
parser.add_argument("--label_file", type=str, help="The label file path.", default=str(MODEL_PATH) + '/labels.txt')
parser.add_argument("--use_cuda", type=str2bool, default=True)
parser.add_argument("--use_2007_metric", type=str2bool, default=True)
parser.add_argument("--nms_method", type=str, default="hard")
parser.add_argument("--iou_threshold", type=float, default=0.5, help="The threshold of Intersection over Union.")
parser.add_argument("--eval_dir", default="eval_results", type=str, help="The directory to store evaluation results.")
parser.add_argument('--mb2_width_mult', default=1.0, type=float,
help='Width Multiplifier for MobilenetV2')
args = parser.parse_args()
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() and args.use_cuda else "cpu")
def group_annotation_by_class(dataset):
true_case_stat = {}
all_gt_boxes = {}
all_difficult_cases = {}
for i in range(len(dataset)):
image_id, annotation = dataset.get_annotation(i)
gt_boxes, classes, is_difficult = annotation
gt_boxes = torch.from_numpy(gt_boxes)
for i, difficult in enumerate(is_difficult):
class_index = int(classes[i])
gt_box = gt_boxes[i]
if not difficult:
true_case_stat[class_index] = true_case_stat.get(class_index, 0) + 1
if class_index not in all_gt_boxes:
all_gt_boxes[class_index] = {}
if image_id not in all_gt_boxes[class_index]:
all_gt_boxes[class_index][image_id] = []
all_gt_boxes[class_index][image_id].append(gt_box)
if class_index not in all_difficult_cases:
all_difficult_cases[class_index] = {}
if image_id not in all_difficult_cases[class_index]:
all_difficult_cases[class_index][image_id] = []
all_difficult_cases[class_index][image_id].append(difficult)
for class_index in all_gt_boxes:
for image_id in all_gt_boxes[class_index]:
all_gt_boxes[class_index][image_id] = torch.stack(all_gt_boxes[class_index][image_id])
for class_index in all_difficult_cases:
for image_id in all_difficult_cases[class_index]:
all_gt_boxes[class_index][image_id] = torch.tensor(all_gt_boxes[class_index][image_id])
return true_case_stat, all_gt_boxes, all_difficult_cases
def compute_average_precision_per_class(num_true_cases, gt_boxes, difficult_cases,
prediction_file, iou_threshold, use_2007_metric):
with open(prediction_file) as f:
image_ids = []
boxes = []
scores = []
for line in f:
t = line.rstrip().split("\t")
image_ids.append(t[0])
scores.append(float(t[1]))
box = torch.tensor([float(v) for v in t[2:]]).unsqueeze(0)
box -= 1.0 # convert to python format where indexes start from 0
boxes.append(box)
scores = np.array(scores)
sorted_indexes = np.argsort(-scores)
boxes = [boxes[i] for i in sorted_indexes]
image_ids = [image_ids[i] for i in sorted_indexes]
true_positive = np.zeros(len(image_ids))
false_positive = np.zeros(len(image_ids))
matched = set()
for i, image_id in enumerate(image_ids):
box = boxes[i]
if image_id not in gt_boxes:
false_positive[i] = 1
continue
gt_box = gt_boxes[image_id]
ious = box_utils.iou_of(box, gt_box)
max_iou = torch.max(ious).item()
max_arg = torch.argmax(ious).item()
if max_iou > iou_threshold:
if difficult_cases[image_id][max_arg] == 0:
if (image_id, max_arg) not in matched:
true_positive[i] = 1
matched.add((image_id, max_arg))
else:
false_positive[i] = 1
else:
false_positive[i] = 1
true_positive = true_positive.cumsum()
false_positive = false_positive.cumsum()
precision = true_positive / (true_positive + false_positive)
recall = true_positive / num_true_cases
if use_2007_metric:
return measurements.compute_voc2007_average_precision(precision, recall)
else:
return measurements.compute_average_precision(precision, recall)
if __name__ == '__main__':
eval_path = pathlib.Path(args.eval_dir)
eval_path.mkdir(exist_ok=True)
timer = Timer()
class_names = [name.strip() for name in open(args.label_file).readlines()]
if args.dataset_type == "voc":
dataset = VOCDataset(args.dataset, is_test=True)
elif args.dataset_type == 'open_images':
dataset = OpenImagesDataset(args.dataset, dataset_type="test")
true_case_stat, all_gb_boxes, all_difficult_cases = group_annotation_by_class(dataset)
if args.net == 'vgg16-ssd':
net = create_vgg_ssd(len(class_names), is_test=True)
elif args.net == 'mb1-ssd':
net = create_mobilenetv1_ssd(len(class_names), is_test=True)
elif args.net == 'mb1-ssd-lite':
net = create_mobilenetv1_ssd_lite(len(class_names), is_test=True)
elif args.net == 'sq-ssd-lite':
net = create_squeezenet_ssd_lite(len(class_names), is_test=True)
elif args.net == 'mb2-ssd-lite':
net = create_mobilenetv2_ssd_lite(len(class_names), width_mult=args.mb2_width_mult, is_test=True)
else:
logging.fatal("The net type is wrong. It should be one of vgg16-ssd, mb1-ssd and mb1-ssd-lite.")
parser.print_help(sys.stderr)
sys.exit(1)
timer.start("Load Model")
net.load(args.trained_model)
net = net.to(DEVICE)
print(f'It took {timer.end("Load Model")} seconds to load the model.')
if args.net == 'vgg16-ssd':
predictor = create_vgg_ssd_predictor(net, nms_method=args.nms_method, device=DEVICE)
elif args.net == 'mb1-ssd':
predictor = create_mobilenetv1_ssd_predictor(net, nms_method=args.nms_method, device=DEVICE)
elif args.net == 'mb1-ssd-lite':
predictor = create_mobilenetv1_ssd_lite_predictor(net, nms_method=args.nms_method, device=DEVICE)
elif args.net == 'sq-ssd-lite':
predictor = create_squeezenet_ssd_lite_predictor(net, nms_method=args.nms_method, device=DEVICE)
elif args.net == 'mb2-ssd-lite':
predictor = create_mobilenetv2_ssd_lite_predictor(net, nms_method=args.nms_method, device=DEVICE)
else:
logging.fatal("The net type is wrong. It should be one of vgg16-ssd, mb1-ssd and mb1-ssd-lite.")
parser.print_help(sys.stderr)
sys.exit(1)
results = []
for i in range(len(dataset)):
print("process image", i)
timer.start("Load Image")
image = dataset.get_image(i)
print("Load Image: {:4f} seconds.".format(timer.end("Load Image")))
timer.start("Predict")
boxes, labels, probs = predictor.predict(image)
print("Prediction: {:4f} seconds.".format(timer.end("Predict")))
indexes = torch.ones(labels.size(0), 1, dtype=torch.float32) * i
results.append(torch.cat([
indexes.reshape(-1, 1),
labels.reshape(-1, 1).float(),
probs.reshape(-1, 1),
boxes + 1.0 # matlab's indexes start from 1
], dim=1))
results = torch.cat(results)
for class_index, class_name in enumerate(class_names):
if class_index == 0: continue # ignore background
prediction_path = eval_path / f"det_test_{class_name}.txt"
with open(prediction_path, "w") as f:
sub = results[results[:, 1] == class_index, :]
for i in range(sub.size(0)):
prob_box = sub[i, 2:].numpy()
image_id = dataset.ids[int(sub[i, 0])]
print(
image_id + "\t" + " ".join([str(v) for v in prob_box]).replace(" ", "\t"),
file=f
)
aps = []
print("\n\nAverage Precision Per-class:")
for class_index, class_name in enumerate(class_names):
if class_index == 0:
continue
prediction_path = eval_path / f"det_test_{class_name}.txt"
ap = compute_average_precision_per_class(
true_case_stat[class_index],
all_gb_boxes[class_index],
all_difficult_cases[class_index],
prediction_path,
args.iou_threshold,
args.use_2007_metric
)
aps.append(ap)
print(f"{class_name}: {ap}")
print(f"\nAverage Precision Across All Classes: {sum(aps) / len(aps)}")

42
src/5_video_test.py Normal file
View File

@ -0,0 +1,42 @@
import cv2
from config import MODEL_PATH, VIDEO_PATH, LABEL_PATH, MODEL_NAME
from vision.ssd.mobilenetv1_ssd import create_mobilenetv1_ssd, create_mobilenetv1_ssd_predictor
train_model = MODEL_PATH / MODEL_NAME
test_videos = VIDEO_PATH.glob('*.avi')
class_names = [name.strip() for name in LABEL_PATH.read_text().split()]
net = create_mobilenetv1_ssd(len(class_names), is_test=True)
net.load(train_model)
predictor = create_mobilenetv1_ssd_predictor(net, nms_method='hard')
count = 0
for video in test_videos:
count += 1
cap = cv2.VideoCapture(str(video))
if count != 1:
continue
while True:
suc, bgr = cap.read()
if not suc:
break
rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
boxes, labels, probs = predictor.predict(rgb, 5, 0.4)
for i in range(boxes.size(0)):
box = boxes[i, :]
if box[0] <= 0 or box[1] <= 0 or box[3] >= 240:
continue
label = f"{class_names[labels[i]]}: {probs[i]:.2f}"
print(label)
cv2.rectangle(bgr, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (255, 255, 0), 4)
cv2.putText(bgr, label, (int(box[0]) + 20, int(box[1]) + 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 255), 2)
cv2.imshow('bgr', bgr)
cv2.waitKey(1)
break

107
src/6_onnx_export.py Normal file
View File

@ -0,0 +1,107 @@
import argparse
import os
import sys
import torch.onnx
from config import MODEL_PATH, MAC, MODEL_NAME
from vision.ssd.mobilenet_v2_ssd_lite import create_mobilenetv2_ssd_lite
from vision.ssd.mobilenetv1_ssd import create_mobilenetv1_ssd
from vision.ssd.mobilenetv1_ssd_lite import create_mobilenetv1_ssd_lite
from vision.ssd.squeezenet_ssd_lite import create_squeezenet_ssd_lite
from vision.ssd.vgg_ssd import create_vgg_ssd
parser = argparse.ArgumentParser()
parser.add_argument('--net', default='ssd-mobilenet',
help="The network architecture, it can be mb1-ssd (aka ssd-mobilenet), mb1-lite-ssd, mb2-ssd-lite or vgg16-ssd.")
parser.add_argument('--input', type=str, default=str(MODEL_PATH / MODEL_NAME),
help="path to input PyTorch model (.pth checkpoint)")
parser.add_argument('--output', type=str, default='', help="desired path of converted ONNX model (default: <NET>.onnx)")
parser.add_argument('--labels', type=str, default=str(MODEL_PATH) + '/labels.txt', help="name of the class labels file")
parser.add_argument('--width', type=int, default=300, help="input width of the model to be exported (in pixels)")
parser.add_argument('--height', type=int, default=300, help="input height of the model to be exported (in pixels)")
parser.add_argument('--batch-size', type=int, default=1, help="batch size of the model to be exported (default=1)")
parser.add_argument('--model-dir', type=str, default=str(MODEL_PATH),
help="directory to look for the input PyTorch model in, and export the converted ONNX model to (if --output doesn't specify a directory)")
args = parser.parse_args()
print(args)
# set the device
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('running on device ' + str(device))
# format input model paths
if args.model_dir:
args.model_dir = os.path.expanduser(args.model_dir)
# find the checkpoint with the lowest loss
if not args.input:
best_loss = 10000
for file in os.listdir(args.model_dir):
if not file.endswith(".pth"):
continue
try:
loss = float(file[file.rfind("-") + 1:len(file) - 4])
if loss < best_loss:
best_loss = loss
args.input = os.path.join(args.model_dir, file)
except ValueError:
continue
print('found best checkpoint with loss {:f} ({:s})'.format(best_loss, args.input))
# append the model dir (if needed)
if not os.path.isfile(args.input):
args.input = os.path.join(args.model_dir, args.input)
if not os.path.isfile(args.labels):
args.labels = os.path.join(args.model_dir, args.labels)
# determine the number of classes
class_names = [name.strip() for name in open(args.labels).readlines()]
num_classes = len(class_names)
# construct the network architecture
print('creating network: ' + args.net)
print('num classes: ' + str(num_classes))
if args.net == 'vgg16-ssd':
net = create_vgg_ssd(len(class_names), is_test=True)
elif args.net == 'mb1-ssd' or args.net == 'ssd-mobilenet':
net = create_mobilenetv1_ssd(len(class_names), is_test=True)
elif args.net == 'mb1-ssd-lite':
net = create_mobilenetv1_ssd_lite(len(class_names), is_test=True)
elif args.net == 'mb2-ssd-lite':
net = create_mobilenetv2_ssd_lite(len(class_names), is_test=True)
elif args.net == 'sq-ssd-lite':
net = create_squeezenet_ssd_lite(len(class_names), is_test=True)
else:
print("The net type is wrong. It should be one of vgg16-ssd, mb1-ssd and mb1-ssd-lite.")
sys.exit(1)
# load the model checkpoint
print('loading checkpoint: ' + args.input)
net.load(args.input)
net.to(device)
net.eval()
if MAC:
dummy_input = torch.randn(args.batch_size, 3, args.height, args.width)
else:
dummy_input = torch.randn(args.batch_size, 3, args.height, args.width).cuda()
# format output model path
if not args.output:
args.output = args.net + '.onnx'
if args.model_dir and args.output.find('/') == -1 and args.output.find('\\') == -1:
args.output = os.path.join(args.model_dir, args.output)
input_names = ['input_0']
output_names = ['scores', 'boxes']
print('exporting model to ONNX...')
torch.onnx.export(net, dummy_input, args.output, verbose=True, input_names=input_names, output_names=output_names)
print('model exported to: {:s}'.format(args.output))
print('task done, exiting program')

9
src/7_onnx_test.py Normal file
View File

@ -0,0 +1,9 @@
import onnx
from config import MODEL_PATH
model_path = MODEL_PATH / 'ssd-mobilenet.onnx'
model = onnx.load(str(model_path))
print(onnx.checker.check_model(model))
print(onnx.helper.printable_graph(model.graph))

12
src/8_merge_voc.py Normal file
View File

@ -0,0 +1,12 @@
from config import IMG_PATH
all_img = list(IMG_PATH.glob('*.jpg'))
all_img.sort()
count = 0
for img in all_img:
xml = IMG_PATH / img.name.replace('jpg', 'xml')
new_file_basename = f"{count:05d}"
xml.rename(xml.parent / (new_file_basename + '.xml'))
img.rename(img.parent / (new_file_basename + '.jpg'))
count += 1

27
src/config.py Normal file
View File

@ -0,0 +1,27 @@
import sys
from pathlib import Path
from licsber.dl import DATASETS_ROOT
CLASSES = (
'circle',
'square',
'huan',
)
MODEL_NAME = 'mb1-ssd-Epoch-29-Loss-1.1743878581944633.pth'
MAC = sys.platform == 'darwin'
VIDEO_PATH = DATASETS_ROOT / '工训赛/video'
VIDEO_PATH = Path(VIDEO_PATH)
IMG_PATH = VIDEO_PATH.parent / 'labeled'
IMG_PATH.mkdir(exist_ok=True)
MODEL_PATH = VIDEO_PATH.parent / 'models'
MODEL_PATH.mkdir(exist_ok=True)
VOC_PATH = VIDEO_PATH.parent / 'voc'
VOC_PATH.mkdir(exist_ok=True)
LABEL_PATH = MODEL_PATH / 'labels.txt'

25
src/main.py Normal file
View File

@ -0,0 +1,25 @@
# noinspection PyUnresolvedReferences
import jetson.inference
import jetson.utils
net = jetson.inference.detectNet(argv=[
'--model=../ssd-mobilenet.onnx',
'--labels=../labels.txt',
'--input-blob=input_0',
'--output-cvg=scores',
'--output-bbox=boxes',
],
threshold=0.5)
input = jetson.utils.videoSource('/dev/video0')
count = 0
while True:
count += 1
img = input.Capture()
detections = net.Detect(img, overlay='box,labels,conf')
print("detected {:d} objects in image".format(len(detections)))
for detection in detections:
print(detection)
if count >= 100:
break

0
src/vision/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,31 @@
import numpy as np
import torch
def object_detection_collate(batch):
images = []
gt_boxes = []
gt_labels = []
image_type = type(batch[0][0])
box_type = type(batch[0][1])
label_type = type(batch[0][2])
for image, boxes, labels in batch:
if image_type is np.ndarray:
images.append(torch.from_numpy(image))
elif image_type is torch.Tensor:
images.append(image)
else:
raise TypeError(f"Image should be tensor or np.ndarray, but got {image_type}.")
if box_type is np.ndarray:
gt_boxes.append(torch.from_numpy(boxes))
elif box_type is torch.Tensor:
gt_boxes.append(boxes)
else:
raise TypeError(f"Boxes should be tensor or np.ndarray, but got {box_type}.")
if label_type is np.ndarray:
gt_labels.append(torch.from_numpy(labels))
elif label_type is torch.Tensor:
gt_labels.append(labels)
else:
raise TypeError(f"Labels should be tensor or np.ndarray, but got {label_type}.")
return torch.stack(images), gt_boxes, gt_labels

View File

@ -0,0 +1,128 @@
import os
import sys
import xml.etree.ElementTree as ET
from random import random
def main(filename):
# ratio to divide up the images
train = 0.7
val = 0.2
test = 0.1
if (train + test + val) != 1.0:
print("probabilities must equal 1")
exit()
# get the labels
labels = []
imgnames = []
annotations = {}
with open(filename, 'r') as labelfile:
label_string = ""
for line in labelfile:
label_string += line.rstrip()
labels = label_string.split(',')
labels = [elem.replace(" ", "") for elem in labels]
# get image names
for filename in os.listdir("./JPEGImages"):
if filename.endswith(".jpg"):
img = filename.rstrip('.jpg')
imgnames.append(img)
print("Labels:", labels, "imgcnt:", len(imgnames))
# initialise annotation list
for label in labels:
annotations[label] = []
# Scan the annotations for the labels
for img in imgnames:
annote = "Annotations/" + img + '.xml'
if os.path.isfile(annote):
tree = ET.parse(annote)
root = tree.getroot()
annote_labels = []
for labelname in root.findall('*/name'):
labelname = labelname.text
annote_labels.append(labelname)
if labelname in labels:
annotations[labelname].append(img)
annotations[img] = annote_labels
else:
print("Missing annotation for ", annote)
exit()
# divvy up the images to the different sets
sampler = imgnames.copy()
train_list = []
val_list = []
test_list = []
while len(sampler) > 0:
dice = random()
elem = sampler.pop()
if dice <= test:
test_list.append(elem)
elif dice <= (test + val):
val_list.append(elem)
else:
train_list.append(elem)
print("Training set:", len(train_list), "validation set:", len(val_list), "test set:", len(test_list))
# create the dataset files
create_folder("./ImageSets/Main/")
with open("./ImageSets/Main/train.txt", 'w') as outfile:
for name in train_list:
outfile.write(name + "\n")
with open("./ImageSets/Main/val.txt", 'w') as outfile:
for name in val_list:
outfile.write(name + "\n")
with open("./ImageSets/Main/trainval.txt", 'w') as outfile:
for name in train_list:
outfile.write(name + "\n")
for name in val_list:
outfile.write(name + "\n")
with open("./ImageSets/Main/test.txt", 'w') as outfile:
for name in test_list:
outfile.write(name + "\n")
# create the individiual files for each label
for label in labels:
with open("./ImageSets/Main/" + label + "_train.txt", 'w') as outfile:
for name in train_list:
if label in annotations[name]:
outfile.write(name + " 1\n")
else:
outfile.write(name + " -1\n")
with open("./ImageSets/Main/" + label + "_val.txt", 'w') as outfile:
for name in val_list:
if label in annotations[name]:
outfile.write(name + " 1\n")
else:
outfile.write(name + " -1\n")
with open("./ImageSets/Main/" + label + "_test.txt", 'w') as outfile:
for name in test_list:
if label in annotations[name]:
outfile.write(name + " 1\n")
else:
outfile.write(name + " -1\n")
def create_folder(foldername):
if os.path.exists(foldername):
print('folder already exists:', foldername)
else:
os.makedirs(foldername)
if __name__ == '__main__':
if len(sys.argv) < 2:
print("usage: python generate_vocdata.py <labelfile>")
exit()
main(sys.argv[1])

View File

@ -0,0 +1,130 @@
import copy
import logging
import os
import pathlib
import cv2
import numpy as np
import pandas as pd
class OpenImagesDataset:
def __init__(self, root,
transform=None, target_transform=None,
dataset_type="train", balance_data=False):
self.root = pathlib.Path(root)
self.transform = transform
self.target_transform = target_transform
self.dataset_type = dataset_type.lower()
self.data, self.class_names, self.class_dict = self._read_data()
self.balance_data = balance_data
self.min_image_num = -1
if self.balance_data:
self.data = self._balance_data()
self.ids = [info['image_id'] for info in self.data]
self.class_stat = None
def _getitem(self, index):
image_info = self.data[index]
image = self._read_image(image_info['image_id'])
# duplicate boxes to prevent corruption of dataset
boxes = copy.copy(image_info['boxes'])
boxes[:, 0] *= image.shape[1]
boxes[:, 1] *= image.shape[0]
boxes[:, 2] *= image.shape[1]
boxes[:, 3] *= image.shape[0]
# duplicate labels to prevent corruption of dataset
labels = copy.copy(image_info['labels'])
if self.transform:
image, boxes, labels = self.transform(image, boxes, labels)
if self.target_transform:
boxes, labels = self.target_transform(boxes, labels)
return image_info['image_id'], image, boxes, labels
def __getitem__(self, index):
_, image, boxes, labels = self._getitem(index)
return image, boxes, labels
def get_annotation(self, index):
"""To conform the eval_ssd implementation that is based on the VOC dataset."""
image_id, image, boxes, labels = self._getitem(index)
is_difficult = np.zeros(boxes.shape[0], dtype=np.uint8)
return image_id, (boxes, labels, is_difficult)
def get_image(self, index):
image_info = self.data[index]
image = self._read_image(image_info['image_id'])
if self.transform:
image, _ = self.transform(image)
return image
def _read_data(self):
annotation_file = f"{self.root}/sub-{self.dataset_type}-annotations-bbox.csv"
logging.info(f'loading annotations from: {annotation_file}')
annotations = pd.read_csv(annotation_file)
logging.info(f'annotations loaded from: {annotation_file}')
class_names = ['BACKGROUND'] + sorted(list(annotations['ClassName'].unique()))
class_dict = {class_name: i for i, class_name in enumerate(class_names)}
data = []
for image_id, group in annotations.groupby("ImageID"):
img_path = os.path.join(self.root, self.dataset_type, image_id + '.jpg')
if os.path.isfile(img_path) is False:
logging.error(f'missing ImageID {image_id}.jpg - dropping from annotations')
continue
boxes = group.loc[:, ["XMin", "YMin", "XMax", "YMax"]].values.astype(np.float32)
# make labels 64 bits to satisfy the cross_entropy function
labels = np.array([class_dict[name] for name in group["ClassName"]], dtype='int64')
# print('found image {:s} ({:d})'.format(img_path, len(data)))
data.append({
'image_id': image_id,
'boxes': boxes,
'labels': labels
})
print('num images: {:d}'.format(len(data)))
return data, class_names, class_dict
def __len__(self):
return len(self.data)
def __repr__(self):
if self.class_stat is None:
self.class_stat = {name: 0 for name in self.class_names[1:]}
for example in self.data:
for class_index in example['labels']:
class_name = self.class_names[class_index]
self.class_stat[class_name] += 1
content = ["Dataset Summary:"
f"Number of Images: {len(self.data)}",
f"Minimum Number of Images for a Class: {self.min_image_num}",
"Label Distribution:"]
for class_name, num in self.class_stat.items():
content.append(f"\t{class_name}: {num}")
return "\n".join(content)
def _read_image(self, image_id):
image_file = self.root / self.dataset_type / f"{image_id}.jpg"
image = cv2.imread(str(image_file))
if image.shape[2] == 1:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
else:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return image
def _balance_data(self):
logging.info('balancing data')
label_image_indexes = [set() for _ in range(len(self.class_names))]
for i, image in enumerate(self.data):
for label_id in image['labels']:
label_image_indexes[label_id].add(i)
label_stat = [len(s) for s in label_image_indexes]
self.min_image_num = min(label_stat[1:])
sample_image_indexes = set()
for image_indexes in label_image_indexes[1:]:
image_indexes = np.array(list(image_indexes))
sub = np.random.permutation(image_indexes)[:self.min_image_num]
sample_image_indexes.update(sub)
sample_data = [self.data[i] for i in sample_image_indexes]
return sample_data

View File

@ -0,0 +1,187 @@
import logging
import os
import pathlib
import xml.etree.ElementTree as ET
import cv2
import numpy as np
class VOCDataset:
def __init__(self, root, transform=None, target_transform=None, is_test=False, keep_difficult=True,
label_file=None):
"""Dataset for VOC data.
Args:
root: the root of the VOC2007 or VOC2012 dataset, the directory contains the following sub-directories:
Annotations, ImageSets, JPEGImages, SegmentationClass, SegmentationObject.
"""
self.root = pathlib.Path(root)
self.transform = transform
self.target_transform = target_transform
# determine the image set file to use
if is_test:
image_sets_file = self.root / "ImageSets/Main/test.txt"
else:
image_sets_file = self.root / "ImageSets/Main/trainval.txt"
if not os.path.isfile(image_sets_file):
image_sets_default = self.root / "ImageSets/Main/default.txt" # CVAT only saves default.txt
if os.path.isfile(image_sets_default):
image_sets_file = image_sets_default
else:
raise IOError("missing ImageSet file {:s}".format(image_sets_file))
# read the image set ID's
self.ids = self._read_image_ids(image_sets_file)
self.keep_difficult = keep_difficult
# if the labels file exists, read in the class names
label_file_name = self.root / "labels.txt"
if os.path.isfile(label_file_name):
classes = []
# classes should be a line-separated list
with open(label_file_name, 'r') as infile:
for line in infile:
classes.append(line.rstrip())
# prepend BACKGROUND as first class
classes.insert(0, 'BACKGROUND')
# classes = [ elem.replace(" ", "") for elem in classes]
self.class_names = tuple(classes)
logging.info("VOC Labels read from file: " + str(self.class_names))
else:
logging.info("No labels file, using default VOC classes.")
self.class_names = ('BACKGROUND',
'aeroplane', 'bicycle', 'bird', 'boat',
'bottle', 'bus', 'car', 'cat', 'chair',
'cow', 'diningtable', 'dog', 'horse',
'motorbike', 'person', 'pottedplant',
'sheep', 'sofa', 'train', 'tvmonitor')
self.class_dict = {class_name: i for i, class_name in enumerate(self.class_names)}
def __getitem__(self, index):
image_id = self.ids[index]
boxes, labels, is_difficult = self._get_annotation(image_id)
if not self.keep_difficult:
boxes = boxes[is_difficult == 0]
labels = labels[is_difficult == 0]
# print('__getitem__ image_id=' + str(image_id) + ' \nboxes=' + str(boxes) + ' \nlabels=' + str(labels))
image = self._read_image(image_id)
if self.transform:
image, boxes, labels = self.transform(image, boxes, labels)
if self.target_transform:
boxes, labels = self.target_transform(boxes, labels)
return image, boxes, labels
def get_image(self, index):
image_id = self.ids[index]
image = self._read_image(image_id)
if self.transform:
image, _ = self.transform(image)
return image
def get_annotation(self, index):
image_id = self.ids[index]
return image_id, self._get_annotation(image_id)
def __len__(self):
return len(self.ids)
def _read_image_ids(self, image_sets_file):
ids = []
with open(image_sets_file) as f:
for line in f:
image_id = line.rstrip()
if len(image_id) <= 0:
print('warning - found empty line in {:s}, skipping line'.format(image_sets_file))
continue
if self._get_num_annotations(image_id) > 0:
if self._find_image(image_id) is not None:
ids.append(line.rstrip())
else:
print('warning - could not find image {:s} - ignoring from dataset'.format(image_id))
else:
print('warning - image {:s} has no box/labels annotations, ignoring from dataset'.format(image_id))
return ids
def _get_num_annotations(self, image_id):
annotation_file = self.root / f"Annotations/{image_id}.xml"
objects = ET.parse(annotation_file).findall("object")
return len(objects)
def _get_annotation(self, image_id):
annotation_file = self.root / f"Annotations/{image_id}.xml"
objects = ET.parse(annotation_file).findall("object")
boxes = []
labels = []
is_difficult = []
for object in objects:
class_name = object.find('name').text.strip() # .lower().strip()
# we're only concerned with clases in our list
if class_name in self.class_dict:
bbox = object.find('bndbox')
# VOC dataset format follows Matlab, in which indexes start from 0
x1 = float(bbox.find('xmin').text) - 1
y1 = float(bbox.find('ymin').text) - 1
x2 = float(bbox.find('xmax').text) - 1
y2 = float(bbox.find('ymax').text) - 1
boxes.append([x1, y1, x2, y2])
labels.append(self.class_dict[class_name])
# retrieve <difficult> element
is_difficult_obj = object.find('difficult')
is_difficult_str = '0'
if is_difficult_obj is not None:
is_difficult_str = object.find('difficult').text
is_difficult.append(int(is_difficult_str) if is_difficult_str else 0)
else:
print("warning - image {:s} has object with unknown class '{:s}'".format(image_id, class_name))
return (np.array(boxes, dtype=np.float32),
np.array(labels, dtype=np.int64),
np.array(is_difficult, dtype=np.uint8))
def _find_image(self, image_id):
img_extensions = (
'.jpg', '.JPG', '.jpeg', '.JPEG', '.png', '.PNG', '.bmp', '.BMP', '.tif', '.TIF', '.tiff', '.TIFF')
for ext in img_extensions:
image_file = os.path.join(self.root, "JPEGImages/{:s}{:s}".format(image_id, ext))
if os.path.exists(image_file):
return image_file
return None
def _read_image(self, image_id):
image_file = self._find_image(image_id)
if image_file is None:
raise IOError('failed to load ' + image_file)
image = cv2.imread(str(image_file))
if image is None or image.size == 0:
raise IOError('failed to load ' + str(image_file))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return image

View File

60
src/vision/nn/alexnet.py Normal file
View File

@ -0,0 +1,60 @@
import torch.nn as nn
import torch.utils.model_zoo as model_zoo
# copied from torchvision (https://github.com/pytorch/vision/blob/master/torchvision/models/alexnet.py).
# The forward function is modified for model pruning.
__all__ = ['AlexNet', 'alexnet']
model_urls = {
'alexnet': 'https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth',
}
class AlexNet(nn.Module):
def __init__(self, num_classes=1000):
super(AlexNet, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(64, 192, kernel_size=5, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(192, 384, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(384, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
)
self.classifier = nn.Sequential(
nn.Dropout(),
nn.Linear(256 * 6 * 6, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Linear(4096, num_classes),
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
x = self.classifier(x)
return x
def alexnet(pretrained=False, **kwargs):
r"""AlexNet model architecture from the
`"One weird trick..." <https://arxiv.org/abs/1404.5997>`_ paper.
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
"""
model = AlexNet(**kwargs)
if pretrained:
model.load_state_dict(model_zoo.load_url(model_urls['alexnet']))
return model

View File

@ -0,0 +1,52 @@
# borrowed from "https://github.com/marvis/pytorch-mobilenet"
import torch.nn as nn
import torch.nn.functional as F
class MobileNetV1(nn.Module):
def __init__(self, num_classes=1024):
super(MobileNetV1, self).__init__()
def conv_bn(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
nn.BatchNorm2d(oup),
nn.ReLU(inplace=True)
)
def conv_dw(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
nn.BatchNorm2d(inp),
nn.ReLU(inplace=True),
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
nn.ReLU(inplace=True),
)
self.model = nn.Sequential(
conv_bn(3, 32, 2),
conv_dw(32, 64, 1),
conv_dw(64, 128, 2),
conv_dw(128, 128, 1),
conv_dw(128, 256, 2),
conv_dw(256, 256, 1),
conv_dw(256, 512, 2),
conv_dw(512, 512, 1),
conv_dw(512, 512, 1),
conv_dw(512, 512, 1),
conv_dw(512, 512, 1),
conv_dw(512, 512, 1),
conv_dw(512, 1024, 2),
conv_dw(1024, 1024, 1),
)
self.fc = nn.Linear(1024, num_classes)
def forward(self, x):
x = self.model(x)
x = F.avg_pool2d(x, 7)
x = x.view(-1, 1024)
x = self.fc(x)
return x

View File

@ -0,0 +1,175 @@
import math
import torch.nn as nn
# Modified from https://github.com/tonylins/pytorch-mobilenet-v2/blob/master/MobileNetV2.py.
# In this version, Relu6 is replaced with Relu to make it ONNX compatible.
# BatchNorm Layer is optional to make it easy do batch norm confusion.
def conv_bn(inp, oup, stride, use_batch_norm=True, onnx_compatible=False):
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
if use_batch_norm:
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
nn.BatchNorm2d(oup),
ReLU(inplace=True)
)
else:
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
ReLU(inplace=True)
)
def conv_1x1_bn(inp, oup, use_batch_norm=True, onnx_compatible=False):
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
if use_batch_norm:
return nn.Sequential(
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
ReLU(inplace=True)
)
else:
return nn.Sequential(
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
ReLU(inplace=True)
)
class InvertedResidual(nn.Module):
def __init__(self, inp, oup, stride, expand_ratio, use_batch_norm=True, onnx_compatible=False):
super(InvertedResidual, self).__init__()
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
self.stride = stride
assert stride in [1, 2]
hidden_dim = round(inp * expand_ratio)
self.use_res_connect = self.stride == 1 and inp == oup
if expand_ratio == 1:
if use_batch_norm:
self.conv = nn.Sequential(
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
nn.BatchNorm2d(hidden_dim),
ReLU(inplace=True),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
)
else:
self.conv = nn.Sequential(
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
ReLU(inplace=True),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
)
else:
if use_batch_norm:
self.conv = nn.Sequential(
# pw
nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
nn.BatchNorm2d(hidden_dim),
ReLU(inplace=True),
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
nn.BatchNorm2d(hidden_dim),
ReLU(inplace=True),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
)
else:
self.conv = nn.Sequential(
# pw
nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
ReLU(inplace=True),
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
ReLU(inplace=True),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
)
def forward(self, x):
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
class MobileNetV2(nn.Module):
def __init__(self, n_class=1000, input_size=224, width_mult=1., dropout_ratio=0.2,
use_batch_norm=True, onnx_compatible=False):
super(MobileNetV2, self).__init__()
block = InvertedResidual
input_channel = 32
last_channel = 1280
interverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1],
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
]
# building first layer
assert input_size % 32 == 0
input_channel = int(input_channel * width_mult)
self.last_channel = int(last_channel * width_mult) if width_mult > 1.0 else last_channel
self.features = [conv_bn(3, input_channel, 2, onnx_compatible=onnx_compatible)]
# building inverted residual blocks
for t, c, n, s in interverted_residual_setting:
output_channel = int(c * width_mult)
for i in range(n):
if i == 0:
self.features.append(block(input_channel, output_channel, s,
expand_ratio=t, use_batch_norm=use_batch_norm,
onnx_compatible=onnx_compatible))
else:
self.features.append(block(input_channel, output_channel, 1,
expand_ratio=t, use_batch_norm=use_batch_norm,
onnx_compatible=onnx_compatible))
input_channel = output_channel
# building last several layers
self.features.append(conv_1x1_bn(input_channel, self.last_channel,
use_batch_norm=use_batch_norm, onnx_compatible=onnx_compatible))
# make it nn.Sequential
self.features = nn.Sequential(*self.features)
# building classifier
self.classifier = nn.Sequential(
nn.Dropout(dropout_ratio),
nn.Linear(self.last_channel, n_class),
)
self._initialize_weights()
def forward(self, x):
x = self.features(x)
x = x.mean(3).mean(2)
x = self.classifier(x)
return x
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
if m.bias is not None:
m.bias.data.zero_()
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
elif isinstance(m, nn.Linear):
n = m.weight.size(1)
m.weight.data.normal_(0, 0.01)
m.bias.data.zero_()

View File

@ -0,0 +1,46 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
from ..utils import box_utils
class MultiboxLoss(nn.Module):
def __init__(self, priors, iou_threshold, neg_pos_ratio,
center_variance, size_variance, device):
"""Implement SSD Multibox Loss.
Basically, Multibox loss combines classification loss
and Smooth L1 regression loss.
"""
super(MultiboxLoss, self).__init__()
self.iou_threshold = iou_threshold
self.neg_pos_ratio = neg_pos_ratio
self.center_variance = center_variance
self.size_variance = size_variance
self.priors = priors
self.priors.to(device)
def forward(self, confidence, predicted_locations, labels, gt_locations):
"""Compute classification loss and smooth l1 loss.
Args:
confidence (batch_size, num_priors, num_classes): class predictions.
locations (batch_size, num_priors, 4): predicted locations.
labels (batch_size, num_priors): real labels of all the priors.
boxes (batch_size, num_priors, 4): real boxes corresponding all the priors.
"""
num_classes = confidence.size(2)
with torch.no_grad():
# derived from cross_entropy=sum(log(p))
loss = -F.log_softmax(confidence, dim=2)[:, :, 0]
mask = box_utils.hard_negative_mining(loss, labels, self.neg_pos_ratio)
confidence = confidence[mask, :]
classification_loss = F.cross_entropy(confidence.reshape(-1, num_classes), labels[mask], size_average=False)
pos_mask = labels > 0
predicted_locations = predicted_locations[pos_mask, :].reshape(-1, 4)
gt_locations = gt_locations[pos_mask, :].reshape(-1, 4)
smooth_l1_loss = F.smooth_l1_loss(predicted_locations, gt_locations, size_average=False)
num_pos = gt_locations.size(0)
return smooth_l1_loss / num_pos, classification_loss / num_pos

View File

@ -0,0 +1,19 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
class ScaledL2Norm(nn.Module):
def __init__(self, in_channels, initial_scale):
super(ScaledL2Norm, self).__init__()
self.in_channels = in_channels
self.scale = nn.Parameter(torch.Tensor(in_channels))
self.initial_scale = initial_scale
self.reset_parameters()
def forward(self, x):
return (F.normalize(x, p=2, dim=1)
* self.scale.unsqueeze(0).unsqueeze(2).unsqueeze(3))
def reset_parameters(self):
self.scale.data.fill_(self.initial_scale)

127
src/vision/nn/squeezenet.py Normal file
View File

@ -0,0 +1,127 @@
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.utils.model_zoo as model_zoo
__all__ = ['SqueezeNet', 'squeezenet1_0', 'squeezenet1_1']
model_urls = {
'squeezenet1_0': 'https://download.pytorch.org/models/squeezenet1_0-a815701f.pth',
'squeezenet1_1': 'https://download.pytorch.org/models/squeezenet1_1-f364aa15.pth',
}
class Fire(nn.Module):
def __init__(self, inplanes, squeeze_planes,
expand1x1_planes, expand3x3_planes):
super(Fire, self).__init__()
self.inplanes = inplanes
self.squeeze = nn.Conv2d(inplanes, squeeze_planes, kernel_size=1)
self.squeeze_activation = nn.ReLU(inplace=True)
self.expand1x1 = nn.Conv2d(squeeze_planes, expand1x1_planes,
kernel_size=1)
self.expand1x1_activation = nn.ReLU(inplace=True)
self.expand3x3 = nn.Conv2d(squeeze_planes, expand3x3_planes,
kernel_size=3, padding=1)
self.expand3x3_activation = nn.ReLU(inplace=True)
def forward(self, x):
x = self.squeeze_activation(self.squeeze(x))
return torch.cat([
self.expand1x1_activation(self.expand1x1(x)),
self.expand3x3_activation(self.expand3x3(x))
], 1)
class SqueezeNet(nn.Module):
def __init__(self, version=1.0, num_classes=1000):
super(SqueezeNet, self).__init__()
if version not in [1.0, 1.1]:
raise ValueError("Unsupported SqueezeNet version {version}:"
"1.0 or 1.1 expected".format(version=version))
self.num_classes = num_classes
if version == 1.0:
self.features = nn.Sequential(
nn.Conv2d(3, 96, kernel_size=7, stride=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
Fire(96, 16, 64, 64),
Fire(128, 16, 64, 64),
Fire(128, 32, 128, 128),
nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
Fire(256, 32, 128, 128),
Fire(256, 48, 192, 192),
Fire(384, 48, 192, 192),
Fire(384, 64, 256, 256),
nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
Fire(512, 64, 256, 256),
)
else:
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, stride=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
Fire(64, 16, 64, 64),
Fire(128, 16, 64, 64),
nn.MaxPool2d(kernel_size=3, stride=2),
Fire(128, 32, 128, 128),
Fire(256, 32, 128, 128),
nn.MaxPool2d(kernel_size=3, stride=2),
Fire(256, 48, 192, 192),
Fire(384, 48, 192, 192),
Fire(384, 64, 256, 256),
Fire(512, 64, 256, 256),
)
# Final convolution is initialized differently form the rest
final_conv = nn.Conv2d(512, self.num_classes, kernel_size=1)
self.classifier = nn.Sequential(
nn.Dropout(p=0.5),
final_conv,
nn.ReLU(inplace=True),
nn.AvgPool2d(13, stride=1)
)
for m in self.modules():
if isinstance(m, nn.Conv2d):
if m is final_conv:
init.normal_(m.weight, mean=0.0, std=0.01)
else:
init.kaiming_uniform_(m.weight)
if m.bias is not None:
init.constant_(m.bias, 0)
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x.view(x.size(0), self.num_classes)
def squeezenet1_0(pretrained=False, **kwargs):
r"""SqueezeNet model architecture from the `"SqueezeNet: AlexNet-level
accuracy with 50x fewer parameters and <0.5MB model size"
<https://arxiv.org/abs/1602.07360>`_ paper.
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
"""
model = SqueezeNet(version=1.0, **kwargs)
if pretrained:
model.load_state_dict(model_zoo.load_url(model_urls['squeezenet1_0']))
return model
def squeezenet1_1(pretrained=False, **kwargs):
r"""SqueezeNet 1.1 model from the `official SqueezeNet repo
<https://github.com/DeepScale/SqueezeNet/tree/master/SqueezeNet_v1.1>`_.
SqueezeNet 1.1 has 2.4x less computation and slightly fewer parameters
than SqueezeNet 1.0, without sacrificing accuracy.
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
"""
model = SqueezeNet(version=1.1, **kwargs)
if pretrained:
model.load_state_dict(model_zoo.load_url(model_urls['squeezenet1_1']))
return model

25
src/vision/nn/vgg.py Normal file
View File

@ -0,0 +1,25 @@
import torch.nn as nn
# borrowed from https://github.com/amdegroot/ssd.pytorch/blob/master/ssd.py
def vgg(cfg, batch_norm=False):
layers = []
in_channels = 3
for v in cfg:
if v == 'M':
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
elif v == 'C':
layers += [nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)]
else:
conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
if batch_norm:
layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
else:
layers += [conv2d, nn.ReLU(inplace=True)]
in_channels = v
pool5 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
conv6 = nn.Conv2d(512, 1024, kernel_size=3, padding=6, dilation=6)
conv7 = nn.Conv2d(1024, 1024, kernel_size=1)
layers += [pool5, conv6,
nn.ReLU(inplace=True), conv7, nn.ReLU(inplace=True)]
return layers

View File

View File

@ -0,0 +1,235 @@
import logging
from heapq import nsmallest
import torch
import torch.nn as nn
from ..utils.model_book import ModelBook
class ModelPrunner:
def __init__(self, model, train_fun, ignored_paths=[]):
""" Implement the pruning algorithm described in the paper https://arxiv.org/pdf/1611.06440.pdf .
The prunning criteria is dC/dh * h, while C is the cost, h is the activation.
"""
self.model = model
self.train_fun = train_fun
self.ignored_paths = ignored_paths
self.book = ModelBook(self.model)
self.outputs = {}
self.grads = {}
self.handles = []
self.decendent_batch_norms = {} # descendants impacted by the conv layers.
self.last_conv_path = None # used to trace the graph
self.descendent_convs = {} # descendants impacted by the conv layers.
self.descendent_linears = {} # descendants impacted by the linear layers.
self.last_linear_path = None # used to trace the graph
def _make_new_conv(self, conv, filter_index, channel_type="out"):
if not isinstance(conv, nn.Conv2d):
raise TypeError(f"The module is not Conv2d, but {type(conv)}.")
if channel_type == "out":
new_conv = nn.Conv2d(conv.in_channels, conv.out_channels - 1, conv.kernel_size, conv.stride,
conv.padding, conv.dilation, conv.groups, conv.bias is not None)
mask = torch.ones(conv.out_channels, dtype=torch.uint8)
mask[filter_index] = 0
new_conv.weight.data = conv.weight.data[mask, :, :, :]
if conv.bias is not None:
new_conv.bias.data = conv.bias.data[mask]
elif channel_type == 'in':
new_conv = nn.Conv2d(conv.in_channels - 1, conv.out_channels, conv.kernel_size, conv.stride,
conv.padding, conv.dilation, conv.groups, conv.bias is not None)
mask = torch.ones(conv.in_channels, dtype=torch.uint8)
mask[filter_index] = 0
new_conv.weight.data = conv.weight.data[:, mask, :, :]
if conv.bias is not None:
new_conv.bias.data = conv.bias.data
else:
raise ValueError(f"{channel_type} should be either 'in' or 'out'.")
return new_conv
def remove_conv_filter(self, path, filter_index):
conv = self.book.get_module(path)
logging.info(f'Prune Conv: {"/".join(path)}, Filter: {filter_index}, Layer: {conv}')
new_conv = self._make_new_conv(conv, filter_index, channel_type="out")
self._update_model(path, new_conv)
next_conv_path = self.descendent_convs.get(path)
if next_conv_path:
next_conv = self.book.get_module(next_conv_path)
new_next_conv = self._make_new_conv(next_conv, filter_index, channel_type="in")
self._update_model(next_conv_path, new_next_conv)
# reduce the num_features of batch norm
batch_norm_path = self.decendent_batch_norms.get(path)
if batch_norm_path:
batch_norm = self.book.get_module(batch_norm_path)
new_batch_norm = nn.BatchNorm2d(batch_norm.num_features - 1)
self._update_model(batch_norm_path, new_batch_norm)
# reduce the in channels of linear layer
linear_path = self.descendent_linears.get(path)
if linear_path:
linear = self.book.get_module(linear_path)
new_linear = self._make_new_linear(linear, filter_index, conv, channel_type="in")
self._update_model(linear_path, new_linear)
@staticmethod
def _make_new_linear(linear, feature_index, conv=None, channel_type="out"):
if channel_type == "out":
new_linear = nn.Linear(linear.in_features, linear.out_features - 1,
bias=linear.bias is not None)
mask = torch.ones(linear.out_features, dtype=torch.uint8)
mask[feature_index] = 0
new_linear.weight.data = linear.weight.data[mask, :]
if linear.bias is not None:
new_linear.bias.data = linear.bias.data[mask]
elif channel_type == "in":
if conv:
block = int(linear.in_features / conv.out_channels)
else:
block = 1
new_linear = nn.Linear(linear.in_features - block, linear.out_features,
bias=linear.bias is not None)
start_index = feature_index * block
end_index = (feature_index + 1) * block
mask = torch.ones(linear.in_features, dtype=torch.uint8)
mask[start_index: end_index] = 0
new_linear.weight.data = linear.weight.data[:, mask]
if linear.bias is not None:
new_linear.bias.data = linear.bias.data
else:
raise ValueError(f"{channel_type} should be either 'in' or 'out'.")
return new_linear
def prune_conv_layers(self, num=1):
"""Prune one conv2d filter.
"""
self.register_conv_hooks()
before_loss, before_accuracy = self.train_fun(self.model)
ranks = []
for path, output in self.outputs.items():
output = output.data
grad = self.grads[path].data
v = grad * output
v = v.sum(0).sum(1).sum(1) # sum to the channel axis.
v = torch.abs(v)
v = v / torch.sqrt(torch.sum(v * v)) # normalize
for i, e in enumerate(v):
ranks.append((path, i, e))
to_prune = nsmallest(num, ranks, key=lambda t: t[2])
to_prune = sorted(to_prune, key=lambda t: (
t[0], -t[1])) # prune the filters with bigger indexes first to avoid rearrangement.
for path, filter_index, value in to_prune:
self.remove_conv_filter(path, filter_index)
self.deregister_hooks()
after_loss, after_accuracy = self.train_fun(self.model)
return after_loss - before_loss, after_accuracy - before_accuracy
def register_conv_hooks(self):
"""Run register before training for pruning."""
self.outputs.clear()
self.grads.clear()
self.handles.clear()
self.last_conv_path = None
self.decendent_batch_norms.clear()
self.descendent_convs.clear()
self.descendent_linears.clear()
def forward_hook(m, input, output):
path = self.book.get_path(m)
if isinstance(m, nn.Conv2d):
if path not in self.ignored_paths:
self.outputs[path] = output
if self.last_conv_path:
self.descendent_convs[self.last_conv_path] = path
self.last_conv_path = path
elif isinstance(m, nn.BatchNorm2d):
if self.last_conv_path:
self.decendent_batch_norms[self.last_conv_path] = path
elif isinstance(m, nn.Linear):
if self.last_conv_path:
self.descendent_linears[self.last_conv_path] = path
self.last_conv_path = None # after a linear layer the conv layer doesn't matter
def backward_hook(m, input, output):
path = self.book.get_path(m)
self.grads[path] = output[0]
for path, m in self.book.modules(module_type=(nn.Conv2d, nn.BatchNorm2d, nn.Linear)):
h = m.register_forward_hook(forward_hook)
self.handles.append(h)
h = m.register_backward_hook(backward_hook)
self.handles.append(h)
def deregister_hooks(self):
"""Run degresiter before retraining to recover the model"""
for handle in self.handles:
handle.remove()
def prune_linear_layers(self, num=1):
self.register_linear_hooks()
before_loss, before_accuracy = self.train_fun(self.model)
ranks = []
for path, output in self.outputs.items():
output = output.data
grad = self.grads[path].data
v = grad * output
v = v.sum(0) # sum to the channel axis.
v = torch.abs(v)
v = v / torch.sqrt(torch.sum(v * v)) # normalize
for i, e in enumerate(v):
ranks.append((path, i, e))
to_prune = nsmallest(num, ranks, key=lambda t: t[2])
to_prune = sorted(to_prune, key=lambda t: (t[0], -t[1]))
for path, feature_index, value in to_prune:
self.remove_linear_feature(path, feature_index)
self.deregister_hooks()
after_loss, after_accuracy = self.train_fun(self.model)
return after_loss - before_loss, after_accuracy - before_accuracy
def register_linear_hooks(self):
self.outputs.clear()
self.grads.clear()
self.handles.clear()
self.descendent_linears.clear()
self.last_linear_path = None
def forward_hook(m, input, output):
path = self.book.get_path(m)
if path not in self.ignored_paths:
self.outputs[path] = output
if self.last_linear_path:
self.descendent_linears[self.last_linear_path] = path
self.last_linear_path = path
def backward_hook(m, input, output):
path = self.book.get_path(m)
self.grads[path] = output[0]
for _, m in self.book.linear_modules():
h = m.register_forward_hook(forward_hook)
self.handles.append(h)
h = m.register_backward_hook(backward_hook)
self.handles.append(h)
def remove_linear_feature(self, path, feature_index):
linear = self.book.get_module(path)
logging.info(f'Prune Linear: {"/".join(path)}, Filter: {feature_index}, Layer: {linear}')
new_linear = self._make_new_linear(linear, feature_index, channel_type="out")
self._update_model(path, new_linear)
# update following linear layers
next_linear_path = self.descendent_linears.get(path)
if next_linear_path:
next_linear = self.book.get_module(next_linear_path)
new_next_linear = self._make_new_linear(next_linear, feature_index, channel_type='in')
self._update_model(next_linear_path, new_next_linear)
def _update_model(self, path, module):
parent = self.book.get_module(path[:-1])
parent._modules[path[-1]] = module
self.book.update(path, module)

View File

View File

View File

@ -0,0 +1,32 @@
import numpy as np
from vision.utils.box_utils import SSDSpec, SSDBoxSizes, generate_ssd_priors
image_size = 300
image_mean = np.array([127, 127, 127]) # RGB layout
image_std = 128.0
iou_threshold = 0.45
center_variance = 0.1
size_variance = 0.2
specs = [
SSDSpec(19, 16, SSDBoxSizes(60, 105), [2, 3]),
SSDSpec(10, 32, SSDBoxSizes(105, 150), [2, 3]),
SSDSpec(5, 64, SSDBoxSizes(150, 195), [2, 3]),
SSDSpec(3, 100, SSDBoxSizes(195, 240), [2, 3]),
SSDSpec(2, 150, SSDBoxSizes(240, 285), [2, 3]),
SSDSpec(1, 300, SSDBoxSizes(285, 330), [2, 3])
]
priors = generate_ssd_priors(specs, image_size)
# print(' ')
# print('SSD-Mobilenet-v1 priors:')
# print(priors.shape)
# print(priors)
# print(' ')
# import torch
# torch.save(priors, 'mb1-ssd-priors.pt')
# np.savetxt('mb1-ssd-priors.txt', priors.numpy())

View File

@ -0,0 +1,21 @@
import numpy as np
from vision.utils.box_utils import SSDSpec, SSDBoxSizes, generate_ssd_priors
image_size = 300
image_mean = np.array([127, 127, 127]) # RGB layout
image_std = 128.0
iou_threshold = 0.45
center_variance = 0.1
size_variance = 0.2
specs = [
SSDSpec(17, 16, SSDBoxSizes(60, 105), [2, 3]),
SSDSpec(10, 32, SSDBoxSizes(105, 150), [2, 3]),
SSDSpec(5, 64, SSDBoxSizes(150, 195), [2, 3]),
SSDSpec(3, 100, SSDBoxSizes(195, 240), [2, 3]),
SSDSpec(2, 150, SSDBoxSizes(240, 285), [2, 3]),
SSDSpec(1, 300, SSDBoxSizes(285, 330), [2, 3])
]
priors = generate_ssd_priors(specs, image_size)

View File

@ -0,0 +1,22 @@
import numpy as np
from vision.utils.box_utils import SSDSpec, SSDBoxSizes, generate_ssd_priors
image_size = 300
image_mean = np.array([123, 117, 104]) # RGB layout
image_std = 1.0
iou_threshold = 0.45
center_variance = 0.1
size_variance = 0.2
specs = [
SSDSpec(38, 8, SSDBoxSizes(30, 60), [2]),
SSDSpec(19, 16, SSDBoxSizes(60, 111), [2, 3]),
SSDSpec(10, 32, SSDBoxSizes(111, 162), [2, 3]),
SSDSpec(5, 64, SSDBoxSizes(162, 213), [2, 3]),
SSDSpec(3, 100, SSDBoxSizes(213, 264), [2]),
SSDSpec(1, 300, SSDBoxSizes(264, 315), [2])
]
priors = generate_ssd_priors(specs, image_size)

View File

@ -0,0 +1,62 @@
from ..transforms.transforms import *
class TrainAugmentation:
def __init__(self, size, mean=0, std=1.0):
"""
Args:
size: the size the of final image.
mean: mean pixel value per channel.
"""
self.mean = mean
self.size = size
self.augment = Compose([
ConvertFromInts(),
PhotometricDistort(),
Expand(self.mean),
RandomSampleCrop(),
RandomMirror(),
ToPercentCoords(),
Resize(self.size),
SubtractMeans(self.mean),
lambda img, boxes=None, labels=None: (img / std, boxes, labels),
ToTensor(),
])
def __call__(self, img, boxes, labels):
"""
Args:
img: the output of cv.imread in RGB layout.
boxes: boundding boxes in the form of (x1, y1, x2, y2).
labels: labels of boxes.
"""
return self.augment(img, boxes, labels)
class TestTransform:
def __init__(self, size, mean=0.0, std=1.0):
self.transform = Compose([
ToPercentCoords(),
Resize(size),
SubtractMeans(mean),
lambda img, boxes=None, labels=None: (img / std, boxes, labels),
ToTensor(),
])
def __call__(self, image, boxes, labels):
return self.transform(image, boxes, labels)
class PredictionTransform:
def __init__(self, size, mean=0.0, std=1.0):
self.transform = Compose([
Resize(size),
SubtractMeans(mean),
lambda img, boxes=None, labels=None: (img / std, boxes, labels),
ToTensor()
])
def __call__(self, image):
image, _, _ = self.transform(image)
return image

View File

@ -0,0 +1,77 @@
import torch
from torch.nn import Conv2d, Sequential, ModuleList, ReLU
from .config import mobilenetv1_ssd_config as config
from .fpn_ssd import FPNSSD
from .predictor import Predictor
from ..nn.mobilenet import MobileNetV1
def create_fpn_mobilenetv1_ssd(num_classes):
base_net = MobileNetV1(1001).features # disable dropout layer
source_layer_indexes = [
(69, Conv2d(in_channels=512, out_channels=256, kernel_size=1)),
(len(base_net), Conv2d(in_channels=1024, out_channels=256, kernel_size=1)),
]
extras = ModuleList([
Sequential(
Conv2d(in_channels=1024, out_channels=256, kernel_size=1),
ReLU(),
Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=2, padding=1),
ReLU()
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
ReLU()
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
ReLU()
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
ReLU()
)
])
regression_headers = ModuleList([
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
# TODO: change to kernel_size=1, padding=0?
])
classification_headers = ModuleList([
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
# TODO: change to kernel_size=1, padding=0?
])
return FPNSSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers)
def create_fpn_mobilenetv1_ssd_predictor(net, candidate_size=200, nms_method=None, sigma=0.5,
device=torch.device('cpu')):
predictor = Predictor(net, config.image_size, config.image_mean, config.priors,
config.center_variance, config.size_variance,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor

143
src/vision/ssd/fpn_ssd.py Normal file
View File

@ -0,0 +1,143 @@
from typing import List, Tuple
import numpy as np
import torch
import torch.nn as nn
from ..utils import box_utils
class FPNSSD(nn.Module):
def __init__(self, num_classes: int, base_net: nn.ModuleList, source_layer_indexes: List[int],
extras: nn.ModuleList, classification_headers: nn.ModuleList,
regression_headers: nn.ModuleList, upsample_mode="nearest"):
"""Compose a SSD model using the given components.
"""
super(FPNSSD, self).__init__()
self.num_classes = num_classes
self.base_net = base_net
self.source_layer_indexes = source_layer_indexes
self.extras = extras
self.classification_headers = classification_headers
self.regression_headers = regression_headers
self.upsample_mode = upsample_mode
# register layers in source_layer_indexes by adding them to a module list
self.source_layer_add_ons = nn.ModuleList([t[1] for t in source_layer_indexes if isinstance(t, tuple)])
self.upsamplers = [
nn.Upsample(size=(19, 19), mode='bilinear'),
nn.Upsample(size=(10, 10), mode='bilinear'),
nn.Upsample(size=(5, 5), mode='bilinear'),
nn.Upsample(size=(3, 3), mode='bilinear'),
nn.Upsample(size=(2, 2), mode='bilinear'),
]
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
confidences = []
locations = []
start_layer_index = 0
header_index = 0
features = []
for end_layer_index in self.source_layer_indexes:
if isinstance(end_layer_index, tuple):
added_layer = end_layer_index[1]
end_layer_index = end_layer_index[0]
else:
added_layer = None
for layer in self.base_net[start_layer_index: end_layer_index]:
x = layer(x)
start_layer_index = end_layer_index
if added_layer:
y = added_layer(x)
else:
y = x
# confidence, location = self.compute_header(header_index, y)
features.append(y)
header_index += 1
# confidences.append(confidence)
# locations.append(location)
for layer in self.base_net[end_layer_index:]:
x = layer(x)
for layer in self.extras:
x = layer(x)
# confidence, location = self.compute_header(header_index, x)
features.append(x)
header_index += 1
# confidences.append(confidence)
# locations.append(location)
upstream_feature = None
for i in range(len(features) - 1, -1, -1):
feature = features[i]
if upstream_feature is not None:
upstream_feature = self.upsamplers[i](upstream_feature)
upstream_feature += feature
else:
upstream_feature = feature
confidence, location = self.compute_header(i, upstream_feature)
confidences.append(confidence)
locations.append(location)
confidences = torch.cat(confidences, 1)
locations = torch.cat(locations, 1)
return confidences, locations
def compute_header(self, i, x):
confidence = self.classification_headers[i](x)
confidence = confidence.permute(0, 2, 3, 1).contiguous()
confidence = confidence.view(confidence.size(0), -1, self.num_classes)
location = self.regression_headers[i](x)
location = location.permute(0, 2, 3, 1).contiguous()
location = location.view(location.size(0), -1, 4)
return confidence, location
def init_from_base_net(self, model):
self.base_net.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage), strict=False)
self.source_layer_add_ons.apply(_xavier_init_)
self.extras.apply(_xavier_init_)
self.classification_headers.apply(_xavier_init_)
self.regression_headers.apply(_xavier_init_)
def init(self):
self.base_net.apply(_xavier_init_)
self.source_layer_add_ons.apply(_xavier_init_)
self.extras.apply(_xavier_init_)
self.classification_headers.apply(_xavier_init_)
self.regression_headers.apply(_xavier_init_)
def load(self, model):
self.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage))
def save(self, model_path):
torch.save(self.state_dict(), model_path)
class MatchPrior(object):
def __init__(self, center_form_priors, center_variance, size_variance, iou_threshold):
self.center_form_priors = center_form_priors
self.corner_form_priors = box_utils.center_form_to_corner_form(center_form_priors)
self.center_variance = center_variance
self.size_variance = size_variance
self.iou_threshold = iou_threshold
def __call__(self, gt_boxes, gt_labels):
if type(gt_boxes) is np.ndarray:
gt_boxes = torch.from_numpy(gt_boxes)
if type(gt_labels) is np.ndarray:
gt_labels = torch.from_numpy(gt_labels)
boxes, labels = box_utils.assign_priors(gt_boxes, gt_labels,
self.corner_form_priors, self.iou_threshold)
boxes = box_utils.corner_form_to_center_form(boxes)
locations = box_utils.convert_boxes_to_locations(boxes, self.center_form_priors, self.center_variance,
self.size_variance)
return locations, labels
def _xavier_init_(m: nn.Module):
if isinstance(m, nn.Conv2d):
nn.init.xavier_uniform_(m.weight)

View File

@ -0,0 +1,71 @@
import torch
from torch import nn
from torch.nn import Conv2d, Sequential, ModuleList, BatchNorm2d
from .config import mobilenetv1_ssd_config as config
from .predictor import Predictor
from .ssd import SSD, GraphPath
from ..nn.mobilenet_v2 import MobileNetV2, InvertedResidual
def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, onnx_compatible=False):
"""Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.
"""
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
return Sequential(
Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
groups=in_channels, stride=stride, padding=padding),
BatchNorm2d(in_channels),
ReLU(),
Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1),
)
def create_mobilenetv2_ssd_lite(num_classes, width_mult=1.0, use_batch_norm=True, onnx_compatible=False, is_test=False):
base_net = MobileNetV2(width_mult=width_mult, use_batch_norm=use_batch_norm,
onnx_compatible=onnx_compatible).features
source_layer_indexes = [
GraphPath(14, 'conv', 3),
19,
]
extras = ModuleList([
InvertedResidual(1280, 512, stride=2, expand_ratio=0.2),
InvertedResidual(512, 256, stride=2, expand_ratio=0.25),
InvertedResidual(256, 256, stride=2, expand_ratio=0.5),
InvertedResidual(256, 64, stride=2, expand_ratio=0.25)
])
regression_headers = ModuleList([
SeperableConv2d(in_channels=round(576 * width_mult), out_channels=6 * 4,
kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=1280, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False),
Conv2d(in_channels=64, out_channels=6 * 4, kernel_size=1),
])
classification_headers = ModuleList([
SeperableConv2d(in_channels=round(576 * width_mult), out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=1280, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=64, out_channels=6 * num_classes, kernel_size=1),
])
return SSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config)
def create_mobilenetv2_ssd_lite_predictor(net, candidate_size=200, nms_method=None, sigma=0.5,
device=torch.device('cpu')):
predictor = Predictor(net, config.image_size, config.image_mean,
config.image_std,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor

View File

@ -0,0 +1,75 @@
from torch.nn import Conv2d, Sequential, ModuleList, ReLU
from .config import mobilenetv1_ssd_config as config
from .predictor import Predictor
from .ssd import SSD
from ..nn.mobilenet import MobileNetV1
def create_mobilenetv1_ssd(num_classes, is_test=False):
base_net = MobileNetV1(1001).model # disable dropout layer
source_layer_indexes = [
12,
14,
]
extras = ModuleList([
Sequential(
Conv2d(in_channels=1024, out_channels=256, kernel_size=1),
ReLU(),
Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1),
ReLU()
),
Sequential(
Conv2d(in_channels=512, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
ReLU()
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
ReLU()
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
ReLU()
)
])
regression_headers = ModuleList([
Conv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=1024, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
# TODO: change to kernel_size=1, padding=0?
])
classification_headers = ModuleList([
Conv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=1024, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
# TODO: change to kernel_size=1, padding=0?
])
return SSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config)
def create_mobilenetv1_ssd_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=None):
predictor = Predictor(net, config.image_size, config.image_mean,
config.image_std,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor

View File

@ -0,0 +1,80 @@
from torch.nn import Conv2d, Sequential, ModuleList, ReLU
from .config import mobilenetv1_ssd_config as config
from .predictor import Predictor
from .ssd import SSD
from ..nn.mobilenet import MobileNetV1
def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0):
"""Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.
"""
return Sequential(
Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
groups=in_channels, stride=stride, padding=padding),
ReLU(),
Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1),
)
def create_mobilenetv1_ssd_lite(num_classes, is_test=False):
base_net = MobileNetV1(1001).model # disable dropout layer
source_layer_indexes = [
12,
14,
]
extras = ModuleList([
Sequential(
Conv2d(in_channels=1024, out_channels=256, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1),
),
Sequential(
Conv2d(in_channels=512, out_channels=128, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1)
)
])
regression_headers = ModuleList([
SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=1024, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=1),
])
classification_headers = ModuleList([
SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=1024, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=1),
])
return SSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config)
def create_mobilenetv1_ssd_lite_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=None):
predictor = Predictor(net, config.image_size, config.image_mean,
config.image_std,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor

View File

@ -0,0 +1,73 @@
import torch
from .data_preprocessing import PredictionTransform
from ..utils import box_utils
from ..utils.misc import Timer
class Predictor:
def __init__(self, net, size, mean=0.0, std=1.0, nms_method=None,
iou_threshold=0.45, filter_threshold=0.01, candidate_size=200, sigma=0.5, device=None):
self.net = net
self.transform = PredictionTransform(size, mean, std)
self.iou_threshold = iou_threshold
self.filter_threshold = filter_threshold
self.candidate_size = candidate_size
self.nms_method = nms_method
self.sigma = sigma
if device:
self.device = device
else:
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.net.to(self.device)
self.net.eval()
self.timer = Timer()
def predict(self, image, top_k=-1, prob_threshold=None):
cpu_device = torch.device("cpu")
height, width, _ = image.shape
image = self.transform(image)
# print(image)
images = image.unsqueeze(0)
images = images.to(self.device)
with torch.no_grad():
self.timer.start()
scores, boxes = self.net.forward(images)
print("Inference time: ", self.timer.end())
boxes = boxes[0]
scores = scores[0]
if not prob_threshold:
prob_threshold = self.filter_threshold
boxes = boxes.to(cpu_device)
scores = scores.to(cpu_device)
picked_box_probs = []
picked_labels = []
for class_index in range(1, scores.size(1)):
probs = scores[:, class_index]
mask = probs > prob_threshold
probs = probs[mask]
if probs.size(0) == 0:
continue
subset_boxes = boxes[mask, :]
box_probs = torch.cat([subset_boxes, probs.reshape(-1, 1)], dim=1)
box_probs = box_utils.nms(box_probs, self.nms_method,
score_threshold=prob_threshold,
iou_threshold=self.iou_threshold,
sigma=self.sigma,
top_k=top_k,
candidate_size=self.candidate_size)
picked_box_probs.append(box_probs)
picked_labels.extend([class_index] * box_probs.size(0))
if not picked_box_probs:
return torch.tensor([]), torch.tensor([]), torch.tensor([])
picked_box_probs = torch.cat(picked_box_probs)
picked_box_probs[:, 0] *= width
picked_box_probs[:, 1] *= height
picked_box_probs[:, 2] *= width
picked_box_probs[:, 3] *= height
return picked_box_probs[:, :4], torch.tensor(picked_labels), picked_box_probs[:, 4]

View File

@ -0,0 +1,86 @@
import torch
from torch.nn import Conv2d, Sequential, ModuleList, ReLU
from .config import squeezenet_ssd_config as config
from .predictor import Predictor
from .ssd import SSD
from ..nn.squeezenet import squeezenet1_1
def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0):
"""Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.
"""
return Sequential(
Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
groups=in_channels, stride=stride, padding=padding),
ReLU(),
Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1),
)
def create_squeezenet_ssd_lite(num_classes, is_test=False):
base_net = squeezenet1_1(False).features # disable dropout layer
source_layer_indexes = [
12
]
extras = ModuleList([
Sequential(
Conv2d(in_channels=512, out_channels=256, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=2),
),
Sequential(
Conv2d(in_channels=512, out_channels=256, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1),
),
Sequential(
Conv2d(in_channels=512, out_channels=128, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1)
)
])
regression_headers = ModuleList([
SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=1),
])
classification_headers = ModuleList([
SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=1),
])
return SSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config)
def create_squeezenet_ssd_lite_predictor(net, candidate_size=200, nms_method=None, sigma=0.5,
device=torch.device('cpu')):
predictor = Predictor(net, config.image_size, config.image_mean,
config.image_std,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor

167
src/vision/ssd/ssd.py Normal file
View File

@ -0,0 +1,167 @@
from collections import namedtuple
from typing import List, Tuple
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from ..utils import box_utils
GraphPath = namedtuple("GraphPath", ['s0', 'name', 's1']) #
class SSD(nn.Module):
def __init__(self, num_classes: int, base_net: nn.ModuleList, source_layer_indexes: List[int],
extras: nn.ModuleList, classification_headers: nn.ModuleList,
regression_headers: nn.ModuleList, is_test=False, config=None, device=None):
"""Compose a SSD model using the given components.
"""
super(SSD, self).__init__()
self.num_classes = num_classes
self.base_net = base_net
self.source_layer_indexes = source_layer_indexes
self.extras = extras
self.classification_headers = classification_headers
self.regression_headers = regression_headers
self.is_test = is_test
self.config = config
# register layers in source_layer_indexes by adding them to a module list
self.source_layer_add_ons = nn.ModuleList([t[1] for t in source_layer_indexes
if isinstance(t, tuple) and not isinstance(t, GraphPath)])
if device:
self.device = device
else:
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if is_test:
self.config = config
self.priors = config.priors.to(self.device)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
confidences = []
locations = []
start_layer_index = 0
header_index = 0
for end_layer_index in self.source_layer_indexes:
if isinstance(end_layer_index, GraphPath):
path = end_layer_index
end_layer_index = end_layer_index.s0
added_layer = None
elif isinstance(end_layer_index, tuple):
added_layer = end_layer_index[1]
end_layer_index = end_layer_index[0]
path = None
else:
added_layer = None
path = None
for layer in self.base_net[start_layer_index: end_layer_index]:
x = layer(x)
if added_layer:
y = added_layer(x)
else:
y = x
if path:
sub = getattr(self.base_net[end_layer_index], path.name)
for layer in sub[:path.s1]:
x = layer(x)
y = x
for layer in sub[path.s1:]:
x = layer(x)
end_layer_index += 1
start_layer_index = end_layer_index
confidence, location = self.compute_header(header_index, y)
header_index += 1
confidences.append(confidence)
locations.append(location)
for layer in self.base_net[end_layer_index:]:
x = layer(x)
for layer in self.extras:
x = layer(x)
confidence, location = self.compute_header(header_index, x)
header_index += 1
confidences.append(confidence)
locations.append(location)
confidences = torch.cat(confidences, 1)
locations = torch.cat(locations, 1)
if self.is_test:
confidences = F.softmax(confidences, dim=2)
boxes = box_utils.convert_locations_to_boxes(
locations, self.priors, self.config.center_variance, self.config.size_variance
)
boxes = box_utils.center_form_to_corner_form(boxes)
return confidences, boxes
else:
return confidences, locations
def compute_header(self, i, x):
confidence = self.classification_headers[i](x)
confidence = confidence.permute(0, 2, 3, 1).contiguous()
confidence = confidence.view(confidence.size(0), -1, self.num_classes)
location = self.regression_headers[i](x)
location = location.permute(0, 2, 3, 1).contiguous()
location = location.view(location.size(0), -1, 4)
return confidence, location
def init_from_base_net(self, model):
self.base_net.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage), strict=True)
self.source_layer_add_ons.apply(_xavier_init_)
self.extras.apply(_xavier_init_)
self.classification_headers.apply(_xavier_init_)
self.regression_headers.apply(_xavier_init_)
def init_from_pretrained_ssd(self, model):
state_dict = torch.load(model, map_location=lambda storage, loc: storage)
state_dict = {k: v for k, v in state_dict.items() if
not (k.startswith("classification_headers") or k.startswith("regression_headers"))}
model_dict = self.state_dict()
model_dict.update(state_dict)
self.load_state_dict(model_dict)
self.classification_headers.apply(_xavier_init_)
self.regression_headers.apply(_xavier_init_)
def init(self):
self.base_net.apply(_xavier_init_)
self.source_layer_add_ons.apply(_xavier_init_)
self.extras.apply(_xavier_init_)
self.classification_headers.apply(_xavier_init_)
self.regression_headers.apply(_xavier_init_)
def load(self, model):
self.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage))
def save(self, model_path):
torch.save(self.state_dict(), model_path)
class MatchPrior(object):
def __init__(self, center_form_priors, center_variance, size_variance, iou_threshold):
self.center_form_priors = center_form_priors
self.corner_form_priors = box_utils.center_form_to_corner_form(center_form_priors)
self.center_variance = center_variance
self.size_variance = size_variance
self.iou_threshold = iou_threshold
def __call__(self, gt_boxes, gt_labels):
if type(gt_boxes) is np.ndarray:
gt_boxes = torch.from_numpy(gt_boxes)
if type(gt_labels) is np.ndarray:
gt_labels = torch.from_numpy(gt_labels)
boxes, labels = box_utils.assign_priors(gt_boxes, gt_labels,
self.corner_form_priors, self.iou_threshold)
boxes = box_utils.corner_form_to_center_form(boxes)
locations = box_utils.convert_boxes_to_locations(boxes, self.center_form_priors, self.center_variance,
self.size_variance)
return locations, labels
def _xavier_init_(m: nn.Module):
if isinstance(m, nn.Conv2d):
nn.init.xavier_uniform_(m.weight)

76
src/vision/ssd/vgg_ssd.py Normal file
View File

@ -0,0 +1,76 @@
from torch.nn import Conv2d, Sequential, ModuleList, ReLU, BatchNorm2d
from .config import vgg_ssd_config as config
from .predictor import Predictor
from .ssd import SSD
from ..nn.vgg import vgg
def create_vgg_ssd(num_classes, is_test=False):
vgg_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'C', 512, 512, 512, 'M',
512, 512, 512]
base_net = ModuleList(vgg(vgg_config))
source_layer_indexes = [
(23, BatchNorm2d(512)),
len(base_net),
]
extras = ModuleList([
Sequential(
Conv2d(in_channels=1024, out_channels=256, kernel_size=1),
ReLU(),
Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1),
ReLU()
),
Sequential(
Conv2d(in_channels=512, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1),
ReLU()
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3),
ReLU()
),
Sequential(
Conv2d(in_channels=256, out_channels=128, kernel_size=1),
ReLU(),
Conv2d(in_channels=128, out_channels=256, kernel_size=3),
ReLU()
)
])
regression_headers = ModuleList([
Conv2d(in_channels=512, out_channels=4 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=1024, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=4 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=4 * 4, kernel_size=3, padding=1),
# TODO: change to kernel_size=1, padding=0?
])
classification_headers = ModuleList([
Conv2d(in_channels=512, out_channels=4 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=1024, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=4 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=256, out_channels=4 * num_classes, kernel_size=3, padding=1),
# TODO: change to kernel_size=1, padding=0?
])
return SSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config)
def create_vgg_ssd_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=None):
predictor = Predictor(net, config.image_size, config.image_mean,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

View File

@ -0,0 +1,49 @@
import tempfile
import torch
from ..ssd.vgg_ssd import create_vgg_ssd
def test_create_vgg_ssd():
for num_classes in [2, 10, 21, 100]:
_ = create_vgg_ssd(num_classes)
def test_forward():
for num_classes in [2]:
net = create_vgg_ssd(num_classes)
net.init()
net.eval()
x = torch.randn(2, 3, 300, 300)
confidences, locations = net.forward(x)
assert confidences.size() == torch.Size([2, 8732, num_classes])
assert locations.size() == torch.Size([2, 8732, 4])
assert confidences.nonzero().size(0) != 0
assert locations.nonzero().size(0) != 0
def test_save_model():
net = create_vgg_ssd(10)
net.init()
with tempfile.TemporaryFile() as f:
net.save(f)
def test_save_load_model_consistency():
net = create_vgg_ssd(20)
net.init()
model_path = tempfile.NamedTemporaryFile().name
net.save(model_path)
net_copy = create_vgg_ssd(20)
net_copy.load(model_path)
net.eval()
net_copy.eval()
for _ in range(1):
x = torch.randn(1, 3, 300, 300)
confidences1, locations1 = net.forward(x)
confidences2, locations2 = net_copy.forward(x)
assert (confidences1 == confidences2).long().sum() == confidences2.numel()
assert (locations1 == locations2).long().sum() == locations2.numel()

View File

View File

@ -0,0 +1,410 @@
# from https://github.com/amdegroot/ssd.pytorch
import types
import cv2
import numpy as np
import torch
from numpy import random
from torchvision import transforms
def intersect(box_a, box_b):
max_xy = np.minimum(box_a[:, 2:], box_b[2:])
min_xy = np.maximum(box_a[:, :2], box_b[:2])
inter = np.clip((max_xy - min_xy), a_min=0, a_max=np.inf)
return inter[:, 0] * inter[:, 1]
def jaccard_numpy(box_a, box_b):
"""Compute the jaccard overlap of two sets of boxes. The jaccard overlap
is simply the intersection over union of two boxes.
E.g.:
A B / A B = A B / (area(A) + area(B) - A B)
Args:
box_a: Multiple bounding boxes, Shape: [num_boxes,4]
box_b: Single bounding box, Shape: [4]
Return:
jaccard overlap: Shape: [box_a.shape[0], box_a.shape[1]]
"""
inter = intersect(box_a, box_b)
area_a = ((box_a[:, 2] - box_a[:, 0]) *
(box_a[:, 3] - box_a[:, 1])) # [A,B]
area_b = ((box_b[2] - box_b[0]) *
(box_b[3] - box_b[1])) # [A,B]
union = area_a + area_b - inter
return inter / union # [A,B]
class Compose(object):
"""Composes several augmentations together.
Args:
transforms (List[Transform]): list of transforms to compose.
Example:
>>> augmentations.Compose([
>>> transforms.CenterCrop(10),
>>> transforms.ToTensor(),
>>> ])
"""
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, img, boxes=None, labels=None):
for t in self.transforms:
img, boxes, labels = t(img, boxes, labels)
return img, boxes, labels
class Lambda(object):
"""Applies a lambda as a transform."""
def __init__(self, lambd):
assert isinstance(lambd, types.LambdaType)
self.lambd = lambd
def __call__(self, img, boxes=None, labels=None):
return self.lambd(img, boxes, labels)
class ConvertFromInts(object):
def __call__(self, image, boxes=None, labels=None):
return image.astype(np.float32), boxes, labels
class SubtractMeans(object):
def __init__(self, mean):
self.mean = np.array(mean, dtype=np.float32)
def __call__(self, image, boxes=None, labels=None):
image = image.astype(np.float32)
image -= self.mean
return image.astype(np.float32), boxes, labels
class ToAbsoluteCoords(object):
def __call__(self, image, boxes=None, labels=None):
height, width, channels = image.shape
boxes[:, 0] *= width
boxes[:, 2] *= width
boxes[:, 1] *= height
boxes[:, 3] *= height
return image, boxes, labels
class ToPercentCoords(object):
def __call__(self, image, boxes=None, labels=None):
height, width, channels = image.shape
boxes[:, 0] /= width
boxes[:, 2] /= width
boxes[:, 1] /= height
boxes[:, 3] /= height
return image, boxes, labels
class Resize(object):
def __init__(self, size=300):
self.size = size
def __call__(self, image, boxes=None, labels=None):
image = cv2.resize(image, (self.size,
self.size))
return image, boxes, labels
class RandomSaturation(object):
def __init__(self, lower=0.5, upper=1.5):
self.lower = lower
self.upper = upper
assert self.upper >= self.lower, "contrast upper must be >= lower."
assert self.lower >= 0, "contrast lower must be non-negative."
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
image[:, :, 1] *= random.uniform(self.lower, self.upper)
return image, boxes, labels
class RandomHue(object):
def __init__(self, delta=18.0):
assert delta >= 0.0 and delta <= 360.0
self.delta = delta
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
image[:, :, 0] += random.uniform(-self.delta, self.delta)
image[:, :, 0][image[:, :, 0] > 360.0] -= 360.0
image[:, :, 0][image[:, :, 0] < 0.0] += 360.0
return image, boxes, labels
class RandomLightingNoise(object):
def __init__(self):
self.perms = ((0, 1, 2), (0, 2, 1),
(1, 0, 2), (1, 2, 0),
(2, 0, 1), (2, 1, 0))
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
swap = self.perms[random.randint(len(self.perms))]
shuffle = SwapChannels(swap) # shuffle channels
image = shuffle(image)
return image, boxes, labels
class ConvertColor(object):
def __init__(self, current, transform):
self.transform = transform
self.current = current
def __call__(self, image, boxes=None, labels=None):
if self.current == 'BGR' and self.transform == 'HSV':
image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
elif self.current == 'RGB' and self.transform == 'HSV':
image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
elif self.current == 'BGR' and self.transform == 'RGB':
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
elif self.current == 'HSV' and self.transform == 'BGR':
image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)
elif self.current == 'HSV' and self.transform == "RGB":
image = cv2.cvtColor(image, cv2.COLOR_HSV2RGB)
else:
raise NotImplementedError
return image, boxes, labels
class RandomContrast(object):
def __init__(self, lower=0.5, upper=1.5):
self.lower = lower
self.upper = upper
assert self.upper >= self.lower, "contrast upper must be >= lower."
assert self.lower >= 0, "contrast lower must be non-negative."
# expects float image
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
alpha = random.uniform(self.lower, self.upper)
image *= alpha
return image, boxes, labels
class RandomBrightness(object):
def __init__(self, delta=32):
assert delta >= 0.0
assert delta <= 255.0
self.delta = delta
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
delta = random.uniform(-self.delta, self.delta)
image += delta
return image, boxes, labels
class ToCV2Image(object):
def __call__(self, tensor, boxes=None, labels=None):
return tensor.cpu().numpy().astype(np.float32).transpose((1, 2, 0)), boxes, labels
class ToTensor(object):
def __call__(self, cvimage, boxes=None, labels=None):
return torch.from_numpy(cvimage.astype(np.float32)).permute(2, 0, 1), boxes, labels
class RandomSampleCrop(object):
"""Crop
Arguments:
img (Image): the image being input during training
boxes (Tensor): the original bounding boxes in pt form
labels (Tensor): the class labels for each bbox
mode (float tuple): the min and max jaccard overlaps
Return:
(img, boxes, classes)
img (Image): the cropped image
boxes (Tensor): the adjusted bounding boxes in pt form
labels (Tensor): the class labels for each bbox
"""
def __init__(self):
self.sample_options = (
# using entire original input image
None,
# sample a patch s.t. MIN jaccard w/ obj in .1,.3,.4,.7,.9
(0.1, None),
(0.3, None),
(0.7, None),
(0.9, None),
# randomly sample a patch
(None, None),
)
def __call__(self, image, boxes=None, labels=None):
height, width, _ = image.shape
while True:
# randomly choose a mode
# mode = random.choice(self.sample_options) # throws numpy deprecation warning
mode = self.sample_options[random.randint(len(self.sample_options))]
if mode is None:
return image, boxes, labels
min_iou, max_iou = mode
if min_iou is None:
min_iou = float('-inf')
if max_iou is None:
max_iou = float('inf')
# max trails (50)
for _ in range(50):
current_image = image
w = random.uniform(0.3 * width, width)
h = random.uniform(0.3 * height, height)
# aspect ratio constraint b/t .5 & 2
if h / w < 0.5 or h / w > 2:
continue
left = random.uniform(width - w)
top = random.uniform(height - h)
# convert to integer rect x1,y1,x2,y2
rect = np.array([int(left), int(top), int(left + w), int(top + h)])
# calculate IoU (jaccard overlap) b/t the cropped and gt boxes
overlap = jaccard_numpy(boxes, rect)
# is min and max overlap constraint satisfied? if not try again
if overlap.min() < min_iou and max_iou < overlap.max():
continue
# cut the crop from the image
current_image = current_image[rect[1]:rect[3], rect[0]:rect[2],
:]
# keep overlap with gt box IF center in sampled patch
centers = (boxes[:, :2] + boxes[:, 2:]) / 2.0
# mask in all gt boxes that above and to the left of centers
m1 = (rect[0] < centers[:, 0]) * (rect[1] < centers[:, 1])
# mask in all gt boxes that under and to the right of centers
m2 = (rect[2] > centers[:, 0]) * (rect[3] > centers[:, 1])
# mask in that both m1 and m2 are true
mask = m1 * m2
# have any valid boxes? try again if not
if not mask.any():
continue
# take only matching gt boxes
current_boxes = boxes[mask, :].copy()
# take only matching gt labels
current_labels = labels[mask]
# should we use the box left and top corner or the crop's
current_boxes[:, :2] = np.maximum(current_boxes[:, :2],
rect[:2])
# adjust to crop (by substracting crop's left,top)
current_boxes[:, :2] -= rect[:2]
current_boxes[:, 2:] = np.minimum(current_boxes[:, 2:],
rect[2:])
# adjust to crop (by substracting crop's left,top)
current_boxes[:, 2:] -= rect[:2]
return current_image, current_boxes, current_labels
class Expand(object):
def __init__(self, mean):
self.mean = mean
def __call__(self, image, boxes, labels):
if random.randint(2):
return image, boxes, labels
height, width, depth = image.shape
ratio = random.uniform(1, 4)
left = random.uniform(0, width * ratio - width)
top = random.uniform(0, height * ratio - height)
expand_image = np.zeros(
(int(height * ratio), int(width * ratio), depth),
dtype=image.dtype)
expand_image[:, :, :] = self.mean
expand_image[int(top):int(top + height),
int(left):int(left + width)] = image
image = expand_image
boxes = boxes.copy()
boxes[:, :2] += (int(left), int(top))
boxes[:, 2:] += (int(left), int(top))
return image, boxes, labels
class RandomMirror(object):
def __call__(self, image, boxes, classes):
_, width, _ = image.shape
if random.randint(2):
image = image[:, ::-1]
boxes = boxes.copy()
boxes[:, 0::2] = width - boxes[:, 2::-2]
return image, boxes, classes
class SwapChannels(object):
"""Transforms a tensorized image by swapping the channels in the order
specified in the swap tuple.
Args:
swaps (int triple): final order of channels
eg: (2, 1, 0)
"""
def __init__(self, swaps):
self.swaps = swaps
def __call__(self, image):
"""
Args:
image (Tensor): image tensor to be transformed
Return:
a tensor with channels swapped according to swap
"""
# if torch.is_tensor(image):
# image = image.data.cpu().numpy()
# else:
# image = np.array(image)
image = image[:, :, self.swaps]
return image
class PhotometricDistort(object):
def __init__(self):
self.pd = [
RandomContrast(), # RGB
ConvertColor(current="RGB", transform='HSV'), # HSV
RandomSaturation(), # HSV
RandomHue(), # HSV
ConvertColor(current='HSV', transform='RGB'), # RGB
RandomContrast() # RGB
]
self.rand_brightness = RandomBrightness()
self.rand_light_noise = RandomLightingNoise()
def __call__(self, image, boxes, labels):
im = image.copy()
im, boxes, labels = self.rand_brightness(im, boxes, labels)
if random.randint(2):
distort = Compose(self.pd[:-1])
else:
distort = Compose(self.pd[1:])
im, boxes, labels = distort(im, boxes, labels)
return self.rand_light_noise(im, boxes, labels)

View File

@ -0,0 +1 @@
from .misc import *

View File

@ -0,0 +1,293 @@
import collections
import itertools
import math
from typing import List
import torch
SSDBoxSizes = collections.namedtuple('SSDBoxSizes', ['min', 'max'])
SSDSpec = collections.namedtuple('SSDSpec', ['feature_map_size', 'shrinkage', 'box_sizes', 'aspect_ratios'])
def generate_ssd_priors(specs: List[SSDSpec], image_size, clamp=True) -> torch.Tensor:
"""Generate SSD Prior Boxes.
It returns the center, height and width of the priors. The values are relative to the image size
Args:
specs: SSDSpecs about the shapes of sizes of prior boxes. i.e.
specs = [
SSDSpec(38, 8, SSDBoxSizes(30, 60), [2]),
SSDSpec(19, 16, SSDBoxSizes(60, 111), [2, 3]),
SSDSpec(10, 32, SSDBoxSizes(111, 162), [2, 3]),
SSDSpec(5, 64, SSDBoxSizes(162, 213), [2, 3]),
SSDSpec(3, 100, SSDBoxSizes(213, 264), [2]),
SSDSpec(1, 300, SSDBoxSizes(264, 315), [2])
]
image_size: image size.
clamp: if true, clamp the values to make fall between [0.0, 1.0]
Returns:
priors (num_priors, 4): The prior boxes represented as [[center_x, center_y, w, h]]. All the values
are relative to the image size.
"""
priors = []
for spec in specs:
scale = image_size / spec.shrinkage
for j, i in itertools.product(range(spec.feature_map_size), repeat=2):
x_center = (i + 0.5) / scale
y_center = (j + 0.5) / scale
# small sized square box
size = spec.box_sizes.min
h = w = size / image_size
priors.append([
x_center,
y_center,
w,
h
])
# big sized square box
size = math.sqrt(spec.box_sizes.max * spec.box_sizes.min)
h = w = size / image_size
priors.append([
x_center,
y_center,
w,
h
])
# change h/w ratio of the small sized box
size = spec.box_sizes.min
h = w = size / image_size
for ratio in spec.aspect_ratios:
ratio = math.sqrt(ratio)
priors.append([
x_center,
y_center,
w * ratio,
h / ratio
])
priors.append([
x_center,
y_center,
w / ratio,
h * ratio
])
priors = torch.tensor(priors)
if clamp:
torch.clamp(priors, 0.0, 1.0, out=priors)
return priors
def convert_locations_to_boxes(locations, priors, center_variance,
size_variance):
"""Convert regressional location results of SSD into boxes in the form of (center_x, center_y, h, w).
The conversion:
$$predicted\_center * center_variance = \frac {real\_center - prior\_center} {prior\_hw}$$
$$exp(predicted\_hw * size_variance) = \frac {real\_hw} {prior\_hw}$$
We do it in the inverse direction here.
Args:
locations (batch_size, num_priors, 4): the regression output of SSD. It will contain the outputs as well.
priors (num_priors, 4) or (batch_size/1, num_priors, 4): prior boxes.
center_variance: a float used to change the scale of center.
size_variance: a float used to change of scale of size.
Returns:
boxes: priors: [[center_x, center_y, h, w]]. All the values
are relative to the image size.
"""
# priors can have one dimension less.
if priors.dim() + 1 == locations.dim():
priors = priors.unsqueeze(0)
return torch.cat([
locations[..., :2] * center_variance * priors[..., 2:] + priors[..., :2],
torch.exp(locations[..., 2:] * size_variance) * priors[..., 2:]
], dim=locations.dim() - 1)
def convert_boxes_to_locations(center_form_boxes, center_form_priors, center_variance, size_variance):
# priors can have one dimension less
if center_form_priors.dim() + 1 == center_form_boxes.dim():
center_form_priors = center_form_priors.unsqueeze(0)
return torch.cat([
(center_form_boxes[..., :2] - center_form_priors[..., :2]) / center_form_priors[..., 2:] / center_variance,
torch.log(center_form_boxes[..., 2:] / center_form_priors[..., 2:]) / size_variance
], dim=center_form_boxes.dim() - 1)
def area_of(left_top, right_bottom) -> torch.Tensor:
"""Compute the areas of rectangles given two corners.
Args:
left_top (N, 2): left top corner.
right_bottom (N, 2): right bottom corner.
Returns:
area (N): return the area.
"""
hw = torch.clamp(right_bottom - left_top, min=0.0)
return hw[..., 0] * hw[..., 1]
def iou_of(boxes0, boxes1, eps=1e-5):
"""Return intersection-over-union (Jaccard index) of boxes.
Args:
boxes0 (N, 4): ground truth boxes.
boxes1 (N or 1, 4): predicted boxes.
eps: a small number to avoid 0 as denominator.
Returns:
iou (N): IoU values.
"""
overlap_left_top = torch.max(boxes0[..., :2], boxes1[..., :2])
overlap_right_bottom = torch.min(boxes0[..., 2:], boxes1[..., 2:])
overlap_area = area_of(overlap_left_top, overlap_right_bottom)
area0 = area_of(boxes0[..., :2], boxes0[..., 2:])
area1 = area_of(boxes1[..., :2], boxes1[..., 2:])
return overlap_area / (area0 + area1 - overlap_area + eps)
def assign_priors(gt_boxes, gt_labels, corner_form_priors,
iou_threshold):
"""Assign ground truth boxes and targets to priors.
Args:
gt_boxes (num_targets, 4): ground truth boxes.
gt_labels (num_targets): labels of targets.
priors (num_priors, 4): corner form priors
Returns:
boxes (num_priors, 4): real values for priors.
labels (num_priros): labels for priors.
"""
# size: num_priors x num_targets
ious = iou_of(gt_boxes.unsqueeze(0), corner_form_priors.unsqueeze(1))
# size: num_priors
best_target_per_prior, best_target_per_prior_index = ious.max(1)
# size: num_targets
best_prior_per_target, best_prior_per_target_index = ious.max(0)
for target_index, prior_index in enumerate(best_prior_per_target_index):
best_target_per_prior_index[prior_index] = target_index
# 2.0 is used to make sure every target has a prior assigned
best_target_per_prior.index_fill_(0, best_prior_per_target_index, 2)
# size: num_priors
labels = gt_labels[best_target_per_prior_index]
labels[best_target_per_prior < iou_threshold] = 0 # the backgournd id
boxes = gt_boxes[best_target_per_prior_index]
return boxes, labels
def hard_negative_mining(loss, labels, neg_pos_ratio):
"""
It used to suppress the presence of a large number of negative prediction.
It works on image level not batch level.
For any example/image, it keeps all the positive predictions and
cut the number of negative predictions to make sure the ratio
between the negative examples and positive examples is no more
the given ratio for an image.
Args:
loss (N, num_priors): the loss for each example.
labels (N, num_priors): the labels.
neg_pos_ratio: the ratio between the negative examples and positive examples.
"""
pos_mask = labels > 0
num_pos = pos_mask.long().sum(dim=1, keepdim=True)
num_neg = num_pos * neg_pos_ratio
loss[pos_mask] = -math.inf
_, indexes = loss.sort(dim=1, descending=True)
_, orders = indexes.sort(dim=1)
neg_mask = orders < num_neg
return pos_mask | neg_mask
def center_form_to_corner_form(locations):
return torch.cat([locations[..., :2] - locations[..., 2:] / 2,
locations[..., :2] + locations[..., 2:] / 2], locations.dim() - 1)
def corner_form_to_center_form(boxes):
return torch.cat([
(boxes[..., :2] + boxes[..., 2:]) / 2,
boxes[..., 2:] - boxes[..., :2]
], boxes.dim() - 1)
def hard_nms(box_scores, iou_threshold, top_k=-1, candidate_size=200):
"""
Args:
box_scores (N, 5): boxes in corner-form and probabilities.
iou_threshold: intersection over union threshold.
top_k: keep top_k results. If k <= 0, keep all the results.
candidate_size: only consider the candidates with the highest scores.
Returns:
picked: a list of indexes of the kept boxes
"""
scores = box_scores[:, -1]
boxes = box_scores[:, :-1]
picked = []
_, indexes = scores.sort(descending=True)
indexes = indexes[:candidate_size]
while len(indexes) > 0:
current = indexes[0]
picked.append(current.item())
if 0 < top_k == len(picked) or len(indexes) == 1:
break
current_box = boxes[current, :]
indexes = indexes[1:]
rest_boxes = boxes[indexes, :]
iou = iou_of(
rest_boxes,
current_box.unsqueeze(0),
)
indexes = indexes[iou <= iou_threshold]
return box_scores[picked, :]
def nms(box_scores, nms_method=None, score_threshold=None, iou_threshold=None,
sigma=0.5, top_k=-1, candidate_size=200):
if nms_method == "soft":
return soft_nms(box_scores, score_threshold, sigma, top_k)
else:
return hard_nms(box_scores, iou_threshold, top_k, candidate_size=candidate_size)
def soft_nms(box_scores, score_threshold, sigma=0.5, top_k=-1):
"""Soft NMS implementation.
References:
https://arxiv.org/abs/1704.04503
https://github.com/facebookresearch/Detectron/blob/master/detectron/utils/cython_nms.pyx
Args:
box_scores (N, 5): boxes in corner-form and probabilities.
score_threshold: boxes with scores less than value are not considered.
sigma: the parameter in score re-computation.
scores[i] = scores[i] * exp(-(iou_i)^2 / simga)
top_k: keep top_k results. If k <= 0, keep all the results.
Returns:
picked_box_scores (K, 5): results of NMS.
"""
picked_box_scores = []
while box_scores.size(0) > 0:
max_score_index = torch.argmax(box_scores[:, 4])
cur_box_prob = torch.tensor(box_scores[max_score_index, :])
picked_box_scores.append(cur_box_prob)
if len(picked_box_scores) == top_k > 0 or box_scores.size(0) == 1:
break
cur_box = cur_box_prob[:-1]
box_scores[max_score_index, :] = box_scores[-1, :]
box_scores = box_scores[:-1, :]
ious = iou_of(cur_box.unsqueeze(0), box_scores[:, :-1])
box_scores[:, -1] = box_scores[:, -1] * torch.exp(-(ious * ious) / sigma)
box_scores = box_scores[box_scores[:, -1] > score_threshold, :]
if len(picked_box_scores) > 0:
return torch.stack(picked_box_scores)
else:
return torch.tensor([])

View File

@ -0,0 +1,238 @@
import itertools
import math
from typing import List
import numpy as np
from .box_utils import SSDSpec
def generate_ssd_priors(specs: List[SSDSpec], image_size, clamp=True):
"""Generate SSD Prior Boxes.
It returns the center, height and width of the priors. The values are relative to the image size
Args:
specs: SSDSpecs about the shapes of sizes of prior boxes. i.e.
specs = [
SSDSpec(38, 8, SSDBoxSizes(30, 60), [2]),
SSDSpec(19, 16, SSDBoxSizes(60, 111), [2, 3]),
SSDSpec(10, 32, SSDBoxSizes(111, 162), [2, 3]),
SSDSpec(5, 64, SSDBoxSizes(162, 213), [2, 3]),
SSDSpec(3, 100, SSDBoxSizes(213, 264), [2]),
SSDSpec(1, 300, SSDBoxSizes(264, 315), [2])
]
image_size: image size.
clamp: if true, clamp the values to make fall between [0.0, 1.0]
Returns:
priors (num_priors, 4): The prior boxes represented as [[center_x, center_y, w, h]]. All the values
are relative to the image size.
"""
priors = []
for spec in specs:
scale = image_size / spec.shrinkage
for j, i in itertools.product(range(spec.feature_map_size), repeat=2):
x_center = (i + 0.5) / scale
y_center = (j + 0.5) / scale
# small sized square box
size = spec.box_sizes.min
h = w = size / image_size
priors.append([
x_center,
y_center,
w,
h
])
# big sized square box
size = math.sqrt(spec.box_sizes.max * spec.box_sizes.min)
h = w = size / image_size
priors.append([
x_center,
y_center,
w,
h
])
# change h/w ratio of the small sized box
size = spec.box_sizes.min
h = w = size / image_size
for ratio in spec.aspect_ratios:
ratio = math.sqrt(ratio)
priors.append([
x_center,
y_center,
w * ratio,
h / ratio
])
priors.append([
x_center,
y_center,
w / ratio,
h * ratio
])
priors = np.array(priors, dtype=np.float32)
if clamp:
np.clip(priors, 0.0, 1.0, out=priors)
return priors
def convert_locations_to_boxes(locations, priors, center_variance,
size_variance):
"""Convert regressional location results of SSD into boxes in the form of (center_x, center_y, h, w).
The conversion:
$$predicted\_center * center_variance = \frac {real\_center - prior\_center} {prior\_hw}$$
$$exp(predicted\_hw * size_variance) = \frac {real\_hw} {prior\_hw}$$
We do it in the inverse direction here.
Args:
locations (batch_size, num_priors, 4): the regression output of SSD. It will contain the outputs as well.
priors (num_priors, 4) or (batch_size/1, num_priors, 4): prior boxes.
center_variance: a float used to change the scale of center.
size_variance: a float used to change of scale of size.
Returns:
boxes: priors: [[center_x, center_y, h, w]]. All the values
are relative to the image size.
"""
# priors can have one dimension less.
if len(priors.shape) + 1 == len(locations.shape):
priors = np.expand_dims(priors, 0)
return np.concatenate([
locations[..., :2] * center_variance * priors[..., 2:] + priors[..., :2],
np.exp(locations[..., 2:] * size_variance) * priors[..., 2:]
], axis=len(locations.shape) - 1)
def convert_boxes_to_locations(center_form_boxes, center_form_priors, center_variance, size_variance):
# priors can have one dimension less
if len(center_form_priors.shape) + 1 == len(center_form_boxes.shape):
center_form_priors = np.expand_dims(center_form_priors, 0)
return np.concatenate([
(center_form_boxes[..., :2] - center_form_priors[..., :2]) / center_form_priors[..., 2:] / center_variance,
np.log(center_form_boxes[..., 2:] / center_form_priors[..., 2:]) / size_variance
], axis=len(center_form_boxes.shape) - 1)
def area_of(left_top, right_bottom):
"""Compute the areas of rectangles given two corners.
Args:
left_top (N, 2): left top corner.
right_bottom (N, 2): right bottom corner.
Returns:
area (N): return the area.
"""
hw = np.clip(right_bottom - left_top, 0.0, None)
return hw[..., 0] * hw[..., 1]
def iou_of(boxes0, boxes1, eps=1e-5):
"""Return intersection-over-union (Jaccard index) of boxes.
Args:
boxes0 (N, 4): ground truth boxes.
boxes1 (N or 1, 4): predicted boxes.
eps: a small number to avoid 0 as denominator.
Returns:
iou (N): IoU values.
"""
overlap_left_top = np.maximum(boxes0[..., :2], boxes1[..., :2])
overlap_right_bottom = np.minimum(boxes0[..., 2:], boxes1[..., 2:])
overlap_area = area_of(overlap_left_top, overlap_right_bottom)
area0 = area_of(boxes0[..., :2], boxes0[..., 2:])
area1 = area_of(boxes1[..., :2], boxes1[..., 2:])
return overlap_area / (area0 + area1 - overlap_area + eps)
def center_form_to_corner_form(locations):
return np.concatenate([locations[..., :2] - locations[..., 2:] / 2,
locations[..., :2] + locations[..., 2:] / 2], len(locations.shape) - 1)
def corner_form_to_center_form(boxes):
return np.concatenate([
(boxes[..., :2] + boxes[..., 2:]) / 2,
boxes[..., 2:] - boxes[..., :2]
], len(boxes.shape) - 1)
def hard_nms(box_scores, iou_threshold, top_k=-1, candidate_size=200):
"""
Args:
box_scores (N, 5): boxes in corner-form and probabilities.
iou_threshold: intersection over union threshold.
top_k: keep top_k results. If k <= 0, keep all the results.
candidate_size: only consider the candidates with the highest scores.
Returns:
picked: a list of indexes of the kept boxes
"""
scores = box_scores[:, -1]
boxes = box_scores[:, :-1]
picked = []
# _, indexes = scores.sort(descending=True)
indexes = np.argsort(scores)
# indexes = indexes[:candidate_size]
indexes = indexes[-candidate_size:]
while len(indexes) > 0:
# current = indexes[0]
current = indexes[-1]
picked.append(current)
if 0 < top_k == len(picked) or len(indexes) == 1:
break
current_box = boxes[current, :]
# indexes = indexes[1:]
indexes = indexes[:-1]
rest_boxes = boxes[indexes, :]
iou = iou_of(
rest_boxes,
np.expand_dims(current_box, axis=0),
)
indexes = indexes[iou <= iou_threshold]
return box_scores[picked, :]
# def nms(box_scores, nms_method=None, score_threshold=None, iou_threshold=None,
# sigma=0.5, top_k=-1, candidate_size=200):
# if nms_method == "soft":
# return soft_nms(box_scores, score_threshold, sigma, top_k)
# else:
# return hard_nms(box_scores, iou_threshold, top_k, candidate_size=candidate_size)
#
# def soft_nms(box_scores, score_threshold, sigma=0.5, top_k=-1):
# """Soft NMS implementation.
#
# References:
# https://arxiv.org/abs/1704.04503
# https://github.com/facebookresearch/Detectron/blob/master/detectron/utils/cython_nms.pyx
#
# Args:
# box_scores (N, 5): boxes in corner-form and probabilities.
# score_threshold: boxes with scores less than value are not considered.
# sigma: the parameter in score re-computation.
# scores[i] = scores[i] * exp(-(iou_i)^2 / simga)
# top_k: keep top_k results. If k <= 0, keep all the results.
# Returns:
# picked_box_scores (K, 5): results of NMS.
# """
# picked_box_scores = []
# while box_scores.size(0) > 0:
# max_score_index = torch.argmax(box_scores[:, 4])
# cur_box_prob = torch.tensor(box_scores[max_score_index, :])
# picked_box_scores.append(cur_box_prob)
# if len(picked_box_scores) == top_k > 0 or box_scores.size(0) == 1:
# break
# cur_box = cur_box_prob[:-1]
# box_scores[max_score_index, :] = box_scores[-1, :]
# box_scores = box_scores[:-1, :]
# ious = iou_of(cur_box.unsqueeze(0), box_scores[:, :-1])
# box_scores[:, -1] = box_scores[:, -1] * torch.exp(-(ious * ious) / sigma)
# box_scores = box_scores[box_scores[:, -1] > score_threshold, :]
# if len(picked_box_scores) > 0:
# return torch.stack(picked_box_scores)
# else:
# return torch.tensor([])

View File

@ -0,0 +1,32 @@
import numpy as np
def compute_average_precision(precision, recall):
"""
It computes average precision based on the definition of Pascal Competition. It computes the under curve area
of precision and recall. Recall follows the normal definition. Precision is a variant.
pascal_precision[i] = typical_precision[i:].max()
"""
# identical but faster version of new_precision[i] = old_precision[i:].max()
precision = np.concatenate([[0.0], precision, [0.0]])
for i in range(len(precision) - 1, 0, -1):
precision[i - 1] = np.maximum(precision[i - 1], precision[i])
# find the index where the value changes
recall = np.concatenate([[0.0], recall, [1.0]])
changing_points = np.where(recall[1:] != recall[:-1])[0]
# compute under curve area
areas = (recall[changing_points + 1] - recall[changing_points]) * precision[changing_points + 1]
return areas.sum()
def compute_voc2007_average_precision(precision, recall):
ap = 0.
for t in np.arange(0., 1.1, 0.1):
if np.sum(recall >= t) == 0:
p = 0
else:
p = np.max(precision[recall >= t])
ap = ap + p / 11.
return ap

46
src/vision/utils/misc.py Normal file
View File

@ -0,0 +1,46 @@
import time
import torch
def str2bool(s):
return s.lower() in ('true', '1')
class Timer:
def __init__(self):
self.clock = {}
def start(self, key="default"):
self.clock[key] = time.time()
def end(self, key="default"):
if key not in self.clock:
raise Exception(f"{key} is not in the clock.")
interval = time.time() - self.clock[key]
del self.clock[key]
return interval
def save_checkpoint(epoch, net_state_dict, optimizer_state_dict, best_score, checkpoint_path, model_path):
torch.save({
'epoch': epoch,
'model': net_state_dict,
'optimizer': optimizer_state_dict,
'best_score': best_score
}, checkpoint_path)
torch.save(net_state_dict, model_path)
def load_checkpoint(checkpoint_path):
return torch.load(checkpoint_path)
def freeze_net_layers(net):
for param in net.parameters():
param.requires_grad = False
def store_labels(path, labels):
with open(path, "w") as f:
f.write("\n".join(labels))

View File

@ -0,0 +1,82 @@
from collections import OrderedDict
import torch.nn as nn
class ModelBook:
"""Maintain the mapping between modules and their paths.
Example:
book = ModelBook(model_ft)
for p, m in book.conv2d_modules():
print('path:', p, 'num of filters:', m.out_channels)
assert m is book.get_module(p)
"""
def __init__(self, model):
self._model = model
self._modules = OrderedDict()
self._paths = OrderedDict()
path = []
self._construct(self._model, path)
def _construct(self, module, path):
if not module._modules:
return
for name, m in module._modules.items():
cur_path = tuple(path + [name])
self._paths[m] = cur_path
self._modules[cur_path] = m
self._construct(m, path + [name])
def conv2d_modules(self):
return self.modules(nn.Conv2d)
def linear_modules(self):
return self.modules(nn.Linear)
def modules(self, module_type=None):
for p, m in self._modules.items():
if not module_type or isinstance(m, module_type):
yield p, m
def num_of_conv2d_modules(self):
return self.num_of_modules(nn.Conv2d)
def num_of_conv2d_filters(self):
"""Return the sum of out_channels of all conv2d layers.
Here we treat the sub weight with size of [in_channels, h, w] as a single filter.
"""
num_filters = 0
for _, m in self.conv2d_modules():
num_filters += m.out_channels
return num_filters
def num_of_linear_modules(self):
return self.num_of_modules(nn.Linear)
def num_of_linear_filters(self):
num_filters = 0
for _, m in self.linear_modules():
num_filters += m.out_features
return num_filters
def num_of_modules(self, module_type=None):
num = 0
for p, m in self._modules.items():
if not module_type or isinstance(m, module_type):
num += 1
return num
def get_module(self, path):
return self._modules.get(path)
def get_path(self, module):
return self._paths.get(module)
def update(self, path, module):
old_module = self._modules[path]
del self._paths[old_module]
self._paths[module] = path
self._modules[path] = module