S'S ALGORITHM

这部分使用Tensorflow和PIL库进行图像识别的完整流程学习。

Image Files

关于图像的存储和像素的解释。

计算机将图像文件存储为一系列二进制数据。当需要显示图像时,计算机会读取这些二进制数据并解码成每个像素点的颜色值和位置信息。然后将这些像素点的颜色值按照特定的顺序映射到显示器上的对应位置,从而重现出图像。

不同的图像文件格式(如JPG、PNG等)使用不同的编码方式来存储和压缩图像数据,但最终都需要被解码还原成像素阵列。

像素(pixel)是构成数字图像的最小单元。它是显示器上的一个小方块,可以单独控制发出的颜色和亮度。 每个像素都由红(R)、绿(G)、蓝(B)三种颜色通道的值来定义其最终颜色。通常使用8位(0-255)来存储每种颜色通道的亮度值,从而可以组合出约1600万种不同的颜色。

像素根据图像的解释采取不同的形式:

在PyTorch中,输入模型的图像数据张量的维度顺序通常是CHW(Channel, Height, Width),即通道数在前,高和宽在后。

而在TensorFlow中,输入模型的图像数据张量的维度顺序通常是HWC(Height, Width, Channel),即高、宽在前,通道数在后。

Tensorflow的内置方法,就可以读取图片获得图像的像素值。value = tf.io.read_file(filename)

Image Type

这部分主要是关于图像解码Decoding的内容。

根据图像的类型不同,Tensorflow解码图像的方式也不同,解码png图像为tf.io.decode_png,解码jpeg图像为tf.io.decode_jpeg,解码任何图像为tf.io.decode_image

既然最后一个选择可以解码所有图像,我们为什么,还要对特定的png,或者jpeg进行特别的解码呢。这是因为总有特定的需求,要求你一定要用特定的文件格式,这种情况下,使用特定的方法,可以更好地声明类型,以及在内部更高效地处理。

还有一个原因就是,万能的decode_image可以解码GIF文件,这时候的输出是(num_frames, height, width, channels),这和我们预期的数据形状不符合,不方便我们下一步的处理。

import tensorflow as tf

# Decode image data from a file in Tensorflow
def decode_image(filename image_type resize_shape channels=0):
    value = tf.io.read_file(filename)
    if image_type == 'png':
        decoded_image = tf.io.decode_png(value channels=channels)
    elif image_type == 'jpeg':
        decoded_image = tf.io.decode_jpeg(value channels=channels)
    else:
        decoded_image = tf.io.decode_image(value channels=channels)

Image Resizing

关于改变图像size,官方代码和解析如下:

tf.image.resize(
    images # 输入的图像张量
    size # 目标尺寸大小,可以是张量或列表
    method=ResizeMethod.BILINEAR # 指定插值算法,BILINEAR为双线性插值
    preserve_aspect_ratio=False # 是否保持长宽比,设为False时可能会导致拉伸
    antialias=False # 是否启用抗锯齿,通常用于放大图像以防止锯齿出现
    name=None # 该操作的可选名称
)

该函数返回一个与images类型相同的4D或3D张量,其形状为(批次,新高度,新宽度,通道)或(新高度,新宽度,通道)。

其中的插值算法(Interpolation algorithm)是在图像处理中用于计算图像缩放时新像素值的技术。当改变图像大小时,需要计算出新图像中每个像素的值,而这些值不可能从原图像中直接获得,因此需要通过插值算法对原图像像素值进行计算和重新采样,以得到新图像。

插值算法在图像处理中的主要作用包括:

  1. 图像缩放(Scaling)
  2. 图像旋转(Rotation)
  3. 图像翘曲校正(Dewarping)
  4. 图像超分辨率重建(Super-resolution)

上面提到的三种不同插值算法及其区别:

  1. 最近邻插值(Nearest-neighbor): 最简单的算法,将输入像素的值直接赋给输出像素,不进行任何插值计算。这种方法运算快,但会产生锯齿和伪影。

  2. 双线性插值(Bilinear): 在2x2像素邻域内,使用加权平均的线性插值方法计算新像素值。它比最近邻更平滑,但可能会使图像看起来有些模糊。适用于缩小图像尺寸。

  3. 双三次插值(Bicubic): 在4x4像素邻域内,使用加权平均的三次卷积插值算法计算新像素值。它比双线性更尊重细节和边缘,减少了模糊现象。适用于放大图像尺寸时保持较高质量。但计算量也更大。

总的来说,最近邻简单快速但质量最差;双线性适中;双三次则在质量和计算量之间权衡。在实际应用中,通常根据具体场景来权衡使用不同的插值算法,以在图像质量和计算效率之间寻求平衡。

在上一部分中的函数中,加上resize方法,就是一个完整的处理图像的函数:

import tensorflow as tf

# Decode image data from a file in Tensorflow
def decode_image(filename image_type resize_shape channels=0):
    value = tf.io.read_file(filename)
    if image_type == 'png':
        decoded_image = tf.io.decode_png(value channels=channels)
    elif image_type == 'jpeg':
        decoded_image = tf.io.decode_jpeg(value channels=channels)
    else:
        decoded_image = tf.io.decode_image(value channels=channels)
    # 这里是新加的代码,指定了image和size
    if resize_shape is not None and image_type in ("png" "jpeg"):        
        decoded_image = tf.image.resize(decoded_image resize_shape)
    
    return decoded_image

Dataset

在进行图像处理的时候不可能只有一张图像,所以这里的任务即为,将图像们打包为一个set,同时对他们应用上面提到的 decode_image 函数,对每一张图像进行 map 函数操作。

为什么使用 map 而不是 for loop,因为 map 是一种并行计算,比 for 有更高的效率。

在Pytorch中也有相关的方法,将一个图像文件夹转换为,可以进行训练的dataset类型的数据。

在 Tensorflow 中使用 tf.data.Dataset.from_tensor_slices 进行 dataset 的转换。

在下面的代码中,使用 TensorFlow 的 tf.constant 函数将 image_paths 转换为一个常量张量。这个操作将 Python 列表转换为 TensorFlow 张量。

比如image_paths = ['path/to/image1.jpg', 'path/to/image2.png', ...]

tf.data.Dataset.from_tensor_slices(filename_tensor) 将 filename_tensor 切片为一个dataset,其中每个元素对应 image_paths 列表中的一个路径字符串。

def get_dataset(image_paths image_type resize_shape channels):
    filename_tensor = tf.constant(image_paths)
    dataset = tf.data.Dataset.from_tensor_slices(filename_tensor)
    def _map_fn(filename):
        return decode_image(filename image_type resize_shape channels=channels)
    return dataset.map(_map_fn)

另外针对以上的操作结果,还可以进行dataset = dataset.batch(32)的批次操作,将数据集转换为模型训练用的输入管道。

Iterator

和Pytorch的循环训练一样,Tensorflow 在训练或评估 loop 中使用迭代器的get_next()方法,来获取每一批次的图像数据,进行模型的训练或评估。

tf.compat.v1.data.make_one_shot_iterator 是一种创建迭代器的方式,它会在启动时将整个数据集读入内存,这种方式适用于较小的数据集。

但对于大型数据集,通常使用 tf.data.Dataset.make_initializable_iterator 来创建可初始化的迭代器,以节省内存。

import tensorflow as tf

def get_image_data(image_paths image_type=None resize_shape=None channels=0):
    dataset = get_dataset(image_paths image_type resize_shape channels=channels)
    iterator = tf.compat.v1.data.make_one_shot_iterator(dataset)
    next_image = iterator.get_next()
    image_data_list = []
    with tf.compat.v1.Session() as sess:
        for i in range(len(image_paths)):
            image_data = sess.run(next_image)
            image_data_list.append(image_data)
    return image_data_list  

next_image = iterator.get_next()从迭代器中获取下一个数据元素,即一个图像数据。

get_next() 返回一个张量,表示下一个图像数据。每次调用这个方法时,都会从数据集中获取下一个图像。

with tf.compat.v1.Session() as sess:可以创建一个Tensorflow的会话。

接着遍历每个文件路径,sess.run(next_image)操作每次可以获取一个图像,然后将获取的图像,追加到列表中。

PIL Module

虽然 TensorFlow 提供了一定的图像处理能力,但在某些特殊场景下,借助成熟的 PIL 库可以让图像处理工作更加简单、高效,并获得更好的兼容性和可移植性。其中子模块Image就是其中常用的工具包。

下面的代码,就实现了转换,改变大小,重采样过滤等步骤。

import numpy as np
from PIL import Image ImageFilter

# Load and resize an image using PIL, and return its pixel data
def pil_resize_image(
    image_path
    resize_shape
    image_mode='RGBA'
    image_filter=None
):
    im = Image.open(image_path)
    converted_im = im.convert(image_mode)
    resized_im = converted_im.resize(resize_shape Image.LANCZOS)
    if image_filter is not None:
        resized_im = resized_im.filter(image_filter)
    im_data = resized_im.getdata()
    return np.asarray(im_data)

Pytorch & Tensorflow Image Processing

PyTorch 和 TensorFlow 都提供了处理图像数据的工具和方法,但具体实现方式有所不同。

PyTorch 中,通常使用 torchvision 模块来处理图像数据,其中包括了常用的图像转换和数据加载功能。以下是一些常见的图像处理方法:

  1. 数据加载:
    • datasets.ImageFolder: 从文件夹中读取图像,可以自动进行图像-标签的对应。
    • DataLoader: 用于批量加载数据,支持多进程加载、随机打乱等功能。
  2. 图像转换:
    • transforms: 提供了一系列的图像转换操作,如 ResizeRandomCropToTensor 等,可以组合使用。
  3. 图像读取:
    • io.read_image: 从文件或二进制数据中读取图像。
  4. 图像显示:
    • torchvision.utils.make_grid: 将一批图像合并为网格图像。
    • torchvision.utils.save_image: 保存图像到文件。

TensorFlow 中,图像处理通常使用 tf.keras.preprocessing 模块,以及一些底层的 TensorFlow 操作。常见的图像处理方法包括:

  1. 数据加载:
    • tf.keras.preprocessing.image_dataset_from_directory: 从目录中加载图像数据集。
    • tf.data.Dataset: 构建数据管道,支持预处理、批量等操作。
  2. 图像转换:
    • tf.image: 提供了一系列图像处理函数,如 resizerot90rgb_to_grayscale 等。
    • tf.keras.preprocessing.image: 包含一些常用的数据增强方法。
  3. 图像读取:
    • tf.io.read_file: 从文件路径中读取图像文件。
    • tf.io.decode_image: 解码图像文件到张量表示。
  4. 图像显示:
    • tf.keras.preprocessing.image.array_to_img: 将数据转换为 PIL 图像对象。
    • 使用 Matplotlib 等外部库显示图像。

总的来说,PyTorch 和 TensorFlow 都提供了全面的图像处理工具,可以方便地完成图像的加载、预处理、数据增强等任务。PyTorch 侧重于使用 torchvision 模块,而 TensorFlow 则更多地使用 tf.keras.preprocessing 以及底层的 TensorFlow 操作。在实际应用中,需要根据具体需求选择合适的方法和工具。

CNN - MNIST modeling

MNIST数据集有60000张训练图像和10000张测试图像。训练数据是20变长的像素点的灰度图像。然后他们被重新填充和resize修改为28像素边长的图像。

针对该数据集,那么如下代码,使得批次为16,输入数据的形状就是(batch_size, self.input_dim**2),边长dim是28像素长度,正方形。

batch_size = 16
dataset = dataset.batch(batch_size)
it = tf.compat.v1.data.make_one_shot_iterator(dataset)
inputs, labels = it.get_next()
with tf.compat.v1.Session() as sess:
    # Batch of data size 16
    input_arr, label_arr = sess.run(
        (inputs, labels))

tf.reshape(inputs, shape)允许对图像进行变形操作,可以使用-1方法,使得该位的像素可以灵活处理,满足其他位像素的大小要求,以保证变形前后的总像素数量一致。

针对该数据集就是reshaped_inputs = tf.reshape(inputs, [-1, self.input_dim, self.input_dim, 1])

Convolution

卷积操作永远是图像处理中最重要的模块。包括了滤波器filter,偏差bias,填充padding。

滤波器通过核矩阵kernel定义,每一个channel都会分配一个滤波器进行卷积操作。我的直观理解,滤波器是提取特征的工具,曾经看过油管的一个视频,处理了一张马里奥的图像,经过了滤波器的卷积,图像的边缘被锐化,更加明显地突出了图形。

另外channel不能简单理解为RGB等颜色通道,可以将channel理解为一个神经网络的一个特征,随着被layer处理,channel数量也会不断变化,核函数就像是神经网络中的神经元,对每一个特征进行特征处理。最相似的,是这里的每一层各个kernel处理后也要加上一个bias,然后得到output矩阵。

Padding的目的是不遗漏边缘的特征。当滤波器无法很好地覆盖整个图像的时候,就会丢失边缘部分的信息,这时候用padding在周边打0,就可以覆盖了。为什么打0呢,单纯就是为了降低dot计算的复杂度。

卷积操作的主要目的是,提取特征,通过学习到的卷积核在输入图像上滑动,生成包含检测到的特征的特征图

Tensorflow中对应的方法是:tf.keras.layers.Conv2D

Max Pooling

卷积操作,是一种特征或者,特征图的提取,特征图我们上面说了也是下一层的一种输入。

相比较,池化的本质是一种下采样,目的是减少数据量,控制过拟合,增强特征的鲁棒性,池化不会改变特征的位置,只会缩小图像的尺寸,而且它也不会像卷积一样,每次学习矩阵内部的信息,而是通过Max或者Min,或者Average方法,提取对应范围内的最大或者最小或者平均值,作为输出,生成下采样后的特征图。

总之,卷积负责检测和表示特征,池化则负责简化数据结构,使模型更简洁、更高效。

Tensorflow中对应的方法是:tf.keras.layers.MaxPool2D

Multiple Layers

图像识别CNN也是一种深度神经网络,所以深层当然是一个重点。

当进行图像识别的时候,每一个层次会有不同的特征提取内容:

多层模型通过逐步提取高层特征,使得模型对图像的细微变化(如光照、角度、尺度)具有更强的鲁棒性,提升了泛化能力。

同时通过多次的卷积核池化可以减少特征量,降低复杂性,逐步降维,降噪,去除冗余信息。

在最后的例子中,虽然只加了一层,但是实际中的模型,有更深的层次。

Fully-Connected

图像识别的最终目的是分类,我们每天都在无意识的,判断,分类。分类的最终目的地,都是将特征,映射到最后的样本空间。

全连接层,就是将图像负责的特征,映射到复杂的非线形关系中,从而进行分类或者回归。

进行全连接层的时候,将出了批次信息之外的信息,都拉展为一个2D张量。然后将该2D张量作为输出,生成一个密集层,指定非线性激活函数,输出最终的对数几率。

Dropout

Dropout是一种正则化技术,目的有两个。

第一,是防止过拟合,尤其是由于共同适应性引起的过拟合。

神经元的共同适应性(co-adaptation)是指某些神经元过度依赖于其他神经元的特定输出,而不是独立地学习有用的特征。Dropout通过随机地禁用神经元,使得每个神经元不得不独立工作,学习更通用和有用的特征,减少了共同适应性。

第二,是提高泛化能力和鲁棒性。

由于每次学习的网络结构都不一样,使得每次进行训练的都是不一样的子网络,这相当于通过多次学习不同的内容,最终得出每次学习时候的重要特征,进行结合使用。

这让人联想到多头注意力机制,也是同样的一种方式。

Logits & Classification

由于是多分类问题,这里同样是输出识别图像的十个分类的对数几率logits。

模型构架结束后就可以进行训练loop。

在输出结果后,得到的是对数几率logits,需要对logits进行softmax方法,转换为我们使用的概率,然后通过计算和真实标签的差异,得到最后的准确度。

Final Code

使用上述的各种方法,对MNIST数据集进行建模和训练:

import tensorflow as tf

class MNISTModel(object):
    # Model Initialization
    def __init__(self, input_dim, output_size):
        self.input_dim = input_dim
        self.output_size = output_size
    
    # CNN Layers
    def model_layers(self, inputs, is_training):
        reshaped_inputs = tf.reshape(inputs, [-1, self.input_dim, self.input_dim, 1])

        conv1 = tf.keras.layers.Conv2D(
            name = 'conv1',
            filters = 32,
            kernel_size = 5,
            padding = 'same',
            activation = 'relu' # or tf.nn.relu
        )(reshaped_inputs)
        
        pool1 = tf.keras.layers.MaxPool2D(
            pool_size=[2, 2],
            strides=2,
            name='pool1'
        )(conv1)

        conv2 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=[5, 5],
            padding='same',
            activation='relu',
            name='conv1'
        )(pool1)

        pool2 = tf.keras.layers.MaxPool2D(
            pool_size=[2, 2],
            strides=2,
            name='pool2'
        )(conv2)

        hwc = pool2.shape.as_list()[1:]
        flattened_size = hwc[0] * hwc[1] * hwc[2]
        pool2_flat = tf.reshape(pool2, [-1, flattened_size])
        
        dense = tf.keras.layers.Dense(
            units=1024,
            activation='relu',
            name='dense'
        )(pool2_flat)

        dropout = tf.keras.layers.Dropout(
            rate=0.4
        )(dense, training=is_training)

        logits = tf.keras.layers.Dense(
            self.output_size,
            name='logits'
        )(dropout)
        
        return logits

Traning Function:

import tensorflow as tf

def run_model_setup(self, inputs, labels, is_training):
    """
    设置模型,计算损失和准确率,进行训练(如果is_training为True)
    
    Args:
        inputs (tf.Tensor): 输入数据
        labels (tf.Tensor): 标签数据
        is_training (bool): 是否为训练模式
        
    Returns:
        None
    """
    
    # 通过模型层获取logits
    logits = self.model_layers(inputs, is_training)
    
    # 将logits转换为概率分布
    probs = tf.nn.softmax(logits, name='probs')
    
    # 获取预测标签
    predictions = tf.math.argmax(probs, axis=-1, name='predictions')
    
    # 获取真实标签
    class_labels = tf.math.argmax(labels, axis=-1)
    
    # 计算预测是否正确
    is_correct = tf.math.equal(predictions, class_labels)
    is_correct_float = tf.cast(is_correct, tf.float32)
    
    # 计算准确率
    accuracy = tf.math.reduce_mean(is_correct_float)
    
    # 保存概率分布和准确率
    self.probs = probs
    self.predictions = predictions
    self.accuracy = accuracy
    
    # 如果为训练模式,计算损失并进行训练
    if is_training:
        # 将标签转换为float32
        labels_float = tf.cast(labels, tf.float32)
        
        # 计算交叉熵损失
        cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(
            labels=labels_float, logits=logits)
        loss = tf.math.reduce_mean(cross_entropy)
        
        # 使用Adam优化器进行训练
        optimizer = tf.compat.v1.train.AdamOptimizer()
        train_op = optimizer.minimize(loss, global_step=self.global_step)
        
        # 保存损失和训练操作
        self.loss = loss
        self.train_op = train_op