-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathdata_loader.py
More file actions
153 lines (129 loc) · 6.17 KB
/
data_loader.py
File metadata and controls
153 lines (129 loc) · 6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import logging
import os
import numpy as np
import torch.utils.data as data
from torchvision.datasets.video_utils import VideoClips
class VideoIterTrain(data.Dataset):
def __init__(self,
dataset_path,
annotation_path,
clip_length,
frame_stride,
video_transform=None,
name="<NO_NAME>",
return_item_subpath=False,
shuffle_list_seed=None):
super(VideoIterTrain, self).__init__()
self.force_color = True
self.dataset_path = dataset_path
self.frames_stride = frame_stride
self.video_transform = video_transform
self.return_item_subpath = return_item_subpath
self.rng = np.random.RandomState(shuffle_list_seed if shuffle_list_seed else 0)
# load video list
self.video_list = self._get_video_list(dataset_path=self.dataset_path, annotation_path=annotation_path)
self.total_clip_length_in_frames = clip_length * frame_stride
self.video_clips = VideoClips(video_paths=self.video_list,
clip_length_in_frames=self.total_clip_length_in_frames,
frames_between_clips=self.total_clip_length_in_frames)
logging.info("VideoIter:: iterator initialized (phase: '{:s}', num: {:d})".format(name, len(self.video_list)))
def getitem_from_raw_video(self, idx):
# get current video info
video, _, _, _ = self.video_clips.get_clip(idx)
video_idx, clip_idx = self.video_clips.get_clip_location(idx)
video_path = self.video_clips.video_paths[video_idx]
in_clip_frames = list(range(0, self.total_clip_length_in_frames, self.frames_stride))
video = video[in_clip_frames]
if self.video_transform is not None:
video = self.video_transform(video)
if "Normal" not in video_path:
label = 1
else:
label = 0
dir, file = video_path.split(os.sep)[-2:]
file = file.split('.')[0]
return video, label, clip_idx, dir, file
def __getitem__(self, index):
succ = False
while not succ:
try:
clip_input, label, sampled_idx, dir, file = self.getitem_from_raw_video(index)
succ = True
except Exception as e:
index = self.rng.choice(range(0, self.__len__()))
logging.warning("VideoIter:: ERROR!! (Force using another index:\n{})\n{}".format(index, e))
return clip_input, label, sampled_idx, dir, file
def __len__(self):
return len(self.video_list)
def _get_video_list(self, dataset_path, annotation_path):
assert os.path.exists(dataset_path) # , "VideoIter:: failed to locate: `{}'".format(dataset_path)
assert os.path.exists(annotation_path) # , "VideoIter:: failed to locate: `{}'".format(annotation_path)
vid_list = []
with open(annotation_path, 'r') as f:
for line in f:
items = line.split()
path = os.path.join(dataset_path, items[0])
vid_list.append(path.strip('\n'))
return vid_list
class VideoIterVal(data.Dataset):
def __init__(self,
dataset_path,
annotation_path,
clip_length,
frame_stride,
video_transform=None,
name="<NO_NAME>",
return_item_subpath=False,
shuffle_list_seed=None):
super(VideoIterVal, self).__init__()
# load params
self.frames_stride = frame_stride
self.dataset_path = dataset_path
self.video_transform = video_transform
self.return_item_subpath = return_item_subpath
self.rng = np.random.RandomState(shuffle_list_seed if shuffle_list_seed else 0)
# load video list
self.video_list = self._get_video_list(dataset_path=self.dataset_path, annotation_path=annotation_path)
self.total_clip_length_in_frames = clip_length * frame_stride
self.video_clips = VideoClips(video_paths=self.video_list,
clip_length_in_frames=self.total_clip_length_in_frames,
frames_between_clips=self.total_clip_length_in_frames)
logging.info("VideoIter:: iterator initialized (phase: '{:s}', num: {:d})".format(name, len(self.video_list)))
def getitem_from_raw_video(self, idx):
# get current video info
video, _, _, _ = self.video_clips.get_clip(idx)
video_idx, clip_idx = self.video_clips.get_clip_location(idx)
video_path = self.video_clips.video_paths[video_idx]
if self.video_transform is not None:
video = self.video_transform(video)
if "Normal" not in video_path:
label = 1
else:
label = 0
dir, file = video_path.split(os.sep)[-2:]
file = file.split('.')[0]
in_clip_frames = list(range(0, self.total_clip_length_in_frames, self.frames_stride))
return video[in_clip_frames], label, clip_idx, dir, file
def __getitem__(self, index):
succ = False
while not succ:
try:
clip_input, label, sampled_idx, dir, file = self.getitem_from_raw_video(index)
succ = True
except Exception as e:
index = self.rng.choice(range(0, self.__len__()))
logging.warning("VideoIter:: ERROR!! (Force using another index:\n{})\n{}".format(index, e))
return clip_input, label, sampled_idx, dir, file
def __len__(self):
return len(self.video_list)
def _get_video_list(self, dataset_path, annotation_path):
assert os.path.exists(dataset_path) # , "VideoIter:: failed to locate: `{}'".format(dataset_path)
assert os.path.exists(annotation_path) # , "VideoIter:: failed to locate: `{}'".format(annotation_path)
v_id = 0
vid_list = []
with open(annotation_path, 'r') as f:
for line in f:
items = line.split()
path = os.path.join(dataset_path, items[0])
vid_list.append(path.strip('\n'))
return vid_list