Gitlab: jfolz/simplejpeg

simplejpeg,基于 libturbojpeg (turbojpeg针对ARM和X86对了优化,宣称其速度是libjpeg的2到4倍)的快速 JPEG 编码和解码的库.

1. simplejpeg 安装

关于库:

Pillow and OpenCV are excellent options for handling JPEG images and a variety of other formats.

If all you want is to read or write a couple of images and don't worry about the details, this package is not for you.

Keep reading if you care about speed and want more control over how your JPEGs are handled.

These are the reasons why I started making this:

  1. Pillow is very slow compared to OpenCV.
  2. Pillow only accepts streams as input. Images in memory have to be wrapped in BytesIO or similar. This adds to the slowness.
  3. OpenCV is gigantic, only accepts Numpy arrays as input, and returns images as BGR instead of RGB.
  4. Recent versions of libturbojpeg offer impressive speed gains on modern processors. Linux distributions and libraries tend to ship very old versions.

This library is especially for you if you need:

  1. Speed.
  2. Read and write directly from/to memory.
  3. Advanced features of the underlying library.

安装:

#Cython && cmake>=2.8.12
pip install simplejpeg

2. simplejpeg 函数

simplejpeg 库提供了四个函数:decode_jpeg_header, decode_jpeg, encode_jpeg, is_jpeg.

2.1. decode_jpeg_header

函数定义:

decode_jpeg_header(
    data: Any, #内存中的JPEG数据
    min_height: SupportsInt=0,  #解码图像的最小 height 像素数
    min_width: SupportsInt=0,   #解码图像的最小 width 像素数
    min_factor: SupportsFloat=1,
) -> (SupportsInt, SupportsInt, Text, Text)
#返回值:(height: int, width: int, colorspace: str, color subsampling: str)

仅从内存中解码 JPEG 图片的 header,作为 JPEG(JFIF)数据. 速度非常块,每秒 100000+ 图片. 解码后返回图像的 height 和 width,colorspace 和 subsampling 作为字符串.

2.2. decode_jpeg

函数定义:

def decode_jpeg(
    data: SupportsBuffer,
    colorspace: Text='RGB', #颜色空间
    fastdct: Any=False,     #True时,速度加快,解码质量损失约 4-5%
    fastupsample: Any=False,#True时,速度加快,解码质量损失约 4-5%
    min_height: SupportsInt=0,
    min_width: SupportsInt=0,
    min_factor: SupportsFloat=1,
    buffer: SupportsBuffer=None,
) -> np.ndarray
#返回值:numpy.ndarray 图像

从内存中 JPEG(JFIF)数据中解码 JPEG 图像.

其中,colorspace:颜色空间,可选项有:'RGB', 'BGR', 'RGBX', 'BGRX', 'XBGR', 'XRGB', 'GRAY', 'RGBA', 'BGRA', 'ABGR', 'ARGB'; 'CMYK' 仅适用于图像已经是 CMYK 空间.

2.3. encode_jpeg

函数定义:

def encode_jpeg(
        image: numpy.ndarray,    #uncompressed image as uint8 array
        quality: SupportsInt=85, #压缩质量
        colorspace: Text='RGB',
        colorsubsampling: Text='444', #可选项,'444', '422', '420', '440', '411', 'Gray'.
        fastdct: Any=True,
) -> bytes
#返回值: bytes 对象

编码 numpy array 图像为 JPEG(JFIF)字符串.

2.4. is_jpeg

函数定义:

def is_jpeg(data: SupportsBytes)

检查 bytes 对象是否包含 JPEG(JFIF)数据.

3. 示例

3.1. test_encode

#!/usr/bin/python3
#!--*-- coding: utf-8 --*--
import io
import numpy as np
from PIL import Image
import simplejpeg


def mean_absolute_difference(a, b):
    return np.abs(a.astype(np.float32) - b.astype(np.float32)).mean()


def test_encode_decode():
    np.random.seed(9)
    im = np.random.randint(0, 255, (1689, 1000, 3), dtype=np.uint8)
  
    # simplejpeg 编码,Pillow 解码
    encoded = simplejpeg.encode_jpeg(im, 85)
    decoded = np.array(Image.open(io.BytesIO(encoded)))
    print(mean_absolute_difference(im, decoded))
    assert 0 < mean_absolute_difference(im, decoded) < 10
  
    # Pillow 编码,simplejpeg 解码
    bio = io.BytesIO()
    pil_im = Image.fromarray(im, 'RGB')
    pil_im.save(bio, format='JPEG', quality=85, subsampling=0)
    decoded = simplejpeg.decode_jpeg(bio.getbuffer())
    print(mean_absolute_difference(im, decoded))
    assert 0 < mean_absolute_difference(im, decoded) < 10


def test_encode_decode_subsampling():
    np.random.seed(9)
    im = np.random.randint(0, 255, (679, 657, 3), dtype=np.uint8)
    for subsampling, code in (('422', 1), ('420', 2), ('440', 1), ('411', 2)):
        # encode with simplejpeg, decode with Pillow
        encoded = simplejpeg.encode_jpeg(im, 85, colorsubsampling=subsampling)
        bio = io.BytesIO(encoded)
        decoded = np.array(Image.open(bio))
        assert 0 < mean_absolute_difference(im, decoded) < 50, subsampling
        # encode with Pillow, decode with simplejpeg
        bio = io.BytesIO()
        pil_im = Image.fromarray(im, 'RGB')
        pil_im.save(bio, format='JPEG', quality=85, subsampling=code)
        decoded = simplejpeg.decode_jpeg(bio.getbuffer())
        assert 0 < mean_absolute_difference(im, decoded) < 50, subsampling


def test_encode_fastdct():
    np.random.seed(9)
    im = np.random.randint(0, 255, (345, 5448, 3), dtype=np.uint8)
    # encode with simplejpeg, decode with Pillow
    encoded_fast = simplejpeg.encode_jpeg(im, 85, fastdct=True)
    decoded_fast = np.array(Image.open(io.BytesIO(encoded_fast)))
    assert 0 < mean_absolute_difference(im, decoded_fast) < 10


def _colorspace_to_rgb(im, colorspace):
    ind = colorspace.index('R'), colorspace.index('G'), colorspace.index('B')
    out = np.zeros(im.shape[:2] + (3,))
    out[...] = im[:, :, ind]
    return out


def test_encode_grayscale():
    np.random.seed(486943)
    im = np.random.randint(0, 255, (589, 486, 1), dtype=np.uint8)
    # encode with simplejpeg, decode with Pillow
    encoded = simplejpeg.encode_jpeg(im, 85, colorspace='gray')
    decoded = np.array(Image.open(io.BytesIO(encoded)))[:, :, np.newaxis]
    assert 0 < mean_absolute_difference(im, decoded) < 10


def test_encode_colorspace():
    np.random.seed(9)
    im = np.random.randint(0, 255, (589, 486, 4), dtype=np.uint8)
    for colorspace in ('RGB', 'BGR', 'RGBX', 'BGRX', 'XBGR',
                       'XRGB', 'RGBA', 'BGRA', 'ABGR', 'ARGB'):
        np_im = np.ascontiguousarray(im[:, :, :len(colorspace)])
        # encode with simplejpeg, decode with Pillow
        encoded = simplejpeg.encode_jpeg(np_im, 85, colorspace=colorspace)
        decoded = np.array(Image.open(io.BytesIO(encoded)))
        np_im = _colorspace_to_rgb(np_im, colorspace)
        assert 0 < mean_absolute_difference(np_im, decoded) < 10


def test_encode_noncontiguous():
    with pytest.raises(ValueError) as exc:
        im = np.zeros((3, 123, 235), dtype=np.uint8)
        simplejpeg.encode_jpeg(im.transpose((1, 2, 0)))
    assert 'contiguous' in str(exc.value)

3.2. test_decode

#!/usr/bin/python3
#!--*-- coding: utf-8 --*--
import glob
import os.path as pt

import numpy as np
from PIL import Image
import pytest

import simplejpeg


ROOT = pt.abspath(pt.dirname(__file__))


def mean_absolute_difference(a, b):
    return np.abs(a.astype(np.float32) - b.astype(np.float32)).mean()


def yield_reference_images():
    image_dir = pt.join(ROOT, 'images', '*.jpg')
    for image_path in glob.iglob(image_dir):
        with open(image_path, 'rb') as fp:
            yield pt.basename(image_path), fp.read(), Image.open(image_path)

          
def test_decode_header():
    for f, data, im in yield_reference_images():
        height, width, colorspace, subsampling = simplejpeg.decode_jpeg_header(data)
        assert width == im.size[0] and height == im.size[1], f


def test_decode_rgb():
    for f, data, im in yield_reference_images():
        im = np.array(im.convert('RGB'))
        sim = simplejpeg.decode_jpeg(data)
        assert mean_absolute_difference(im, sim) < 1, f


def test_decode_gray():
    for f, data, im in yield_reference_images():
        im = np.array(im.convert('L'))[:, :, np.newaxis]
        sim = simplejpeg.decode_jpeg(data, 'gray')
        assert mean_absolute_difference(im, sim) < 1, f


def test_decode_buffer():
    b = bytearray(1024*1024*3)
    for f, data, im in yield_reference_images():
        im = np.array(im.convert('RGB'))
        sim = simplejpeg.decode_jpeg(data, buffer=b)
        assert mean_absolute_difference(im, sim) < 1, f


def test_decode_buffer_insufficient():
    for f, data, im in yield_reference_images():
        b = bytearray(len(im.getdata()) - 1)
        with pytest.raises(ValueError) as excinfo:
            simplejpeg.decode_jpeg(data, buffer=b)
        assert 'too small' in str(excinfo.value)


def test_decode_truncated():
    for f, data, im in yield_reference_images():
        with pytest.raises(ValueError) as excinfo:
            simplejpeg.decode_jpeg(data[:-1])
        assert str(excinfo.value) == 'Premature end of JPEG file'


def test_decode_fastdct():
    for f, data, im in yield_reference_images():
        im = np.array(im.convert('RGB'))
        sim = simplejpeg.decode_jpeg(data, fastdct=True)
        assert mean_absolute_difference(im, sim) < 1.5, f


def test_decode_fastupsample():
    for f, data, im in yield_reference_images():
        im = np.array(im.convert('RGB'))
        sim = simplejpeg.decode_jpeg(data, fastupsample=True)
        assert mean_absolute_difference(im, sim) < 1.5, f


def test_decode_fastdct_fastupsample():
    for f, data, im in yield_reference_images():
        im = np.array(im.convert('RGB'))
        sim = simplejpeg.decode_jpeg(data, fastdct=True, fastupsample=True)
        assert mean_absolute_difference(im, sim) < 2, f


def test_decode_min_width_height():
    for f, data, im in yield_reference_images():
        w, h = im.size
        # half height, but require original width
        sim = simplejpeg.decode_jpeg(data, min_height=h/2, min_width=w)
        assert sim.shape[0] == h, f
        # half width, but require original height
        sim = simplejpeg.decode_jpeg(data, min_width=w/2, min_height=h)
        assert sim.shape[1] == w, f
        # half height
        sim = simplejpeg.decode_jpeg(data, min_height=h/2)
        assert h/2 <= sim.shape[0] < h, f
        # half height, but require minimum factor greater 2
        sim = simplejpeg.decode_jpeg(data, min_height=h/2, min_factor=2.01)
        assert sim.shape[0] == h, f
        # half width
        sim = simplejpeg.decode_jpeg(data, min_width=w/2, min_height=1)
        assert w/2 <= sim.shape[1] < w, f
        # half width, but require minimum factor greater 2
        sim = simplejpeg.decode_jpeg(data, min_width=w/2, min_height=1, min_factor=2.01)
        assert sim.shape[1] == w, f


def _colorspace_to_rgb(im, colorspace):
    ind = colorspace.index('R'), colorspace.index('G'), colorspace.index('B')
    out = np.zeros(im.shape[:2] + (3,))
    out[...] = im[:, :, ind]
    return out


def test_decode_colorspace():
    for f, data, im in yield_reference_images():
        for colorspace in ('RGB', 'BGR', 'RGBX', 'BGRX', 'XBGR',
                           'XRGB', 'RGBA', 'BGRA', 'ABGR', 'ARGB'):
            np_im = np.array(im.convert('RGB'))
            sim = simplejpeg.decode_jpeg(data, colorspace)
            sim = _colorspace_to_rgb(sim, colorspace)
            assert mean_absolute_difference(np_im, sim) < 1, f
        np_im = np.array(im.convert('L'))[:, :, np.newaxis]
        sim = simplejpeg.decode_jpeg(data, 'GRAY')
        assert mean_absolute_difference(np_im, sim) < 1, f


def test_decode_writable():
    with pytest.raises(BufferError) as exc:
        b = b'x' * (1024*1024*3)
        for f, data, im in yield_reference_images():
            simplejpeg.decode_jpeg(data, buffer=b)
    assert 'writable' in str(exc.value)


def test_decode_noncontiguous():
    with pytest.raises(ValueError) as exc:
        b = np.zeros((3, 1024, 1024), dtype=np.uint8)
        for f, data, im in yield_reference_images():
            simplejpeg.decode_jpeg(data, buffer=b.transpose((1, 2, 0)))
    assert 'contiguous' in str(exc.value)

3.3. test_is_jpeg

import os.path as pt
import glob
import simplejpeg


ROOT = pt.abspath(pt.dirname(__file__))

def yield_reference_images():
    image_dir = pt.join(ROOT, 'images', '*.jpg')
    for image_path in glob.iglob(image_dir):
        with open(image_path, 'rb') as fp:
            yield pt.basename(image_path), fp.read()


def test_is_jpeg_with_jpeg():
    for f, data in yield_reference_images():
        assert simplejpeg.is_jpeg(data), f


def test_is_jpeg_non_jpeg():
    for f, data in yield_reference_images():
        b = bytearray(data)
        b[0] = ord('I')
        assert not simplejpeg.is_jpeg(b), f

4. 实例

python实现不同电脑之间视频传输 - 2021.06.23

实现不同电脑之间视频传输
视频传输实现的前提是确保发送端和接收端接在同一个局域网下,分为发送端和接收端.

可采用 simplejpeg.encode_jpeg() 替换 cv2.imencode() .

[1] - 发送端

import socket
import time
import cv2
import imagezmq
import traceback
import simplejpeg

# capture=cv2.VideoCapture(0) # 获取摄像头视频
capture=cv2.VideoCapture("test.mp4")
# 192.168.100.104 为发送端主机ip地址
sender = imagezmq.ImageSender(connect_to='tcp://192.168.100.104:5555', REQ_REP=False)
rpi_name = socket.gethostname() # 获取主机名
time.sleep(2.0)  
jpeg_quality = 95   #调整图片压缩质量,95%
while(True):
    try:
        ref, frame=capture.read(0)
        time.sleep(1/60)
        image = cv2.resize(frame,(1280,720))
        curtime = time.time()
        msg = rpi_name+'*'+str(curtime)
        # 通过simplejpeg函数将图片编码为jpeg格式,提高传输效率
        jpg_buffer = simplejpeg.encode_jpeg(image, quality=jpeg_quality,
                                            colorspace='BGR')
      
        sender.send_jpg(msg, jpg_buffer)
        cv2.imshow(rpi_name, image)
        cv2.waitKey(1)
    except:
        print(traceback.print_exc())
        break

[2] - 接收端(可以多台设备同时接收)

import cv2
import imagezmq
import traceback
import time
import simplejpeg

# 接收发送端数据,输入发送端的ip地址
image_hub = imagezmq.ImageHub(open_port='tcp://192.168.100.104:5555',REQ_REP=False)
frame_count = 1
time1 = 0
while True:
    try:
        time1 = time.time() if frame_count == 1 else time1
        name, image = image_hub.recv_jpg()
        # simplejpeg 解码
        image = simplejpeg.decode_jpeg(image, colorspace='BGR')
    
        cv2.imshow(name.split('*')[0], image)
        cv2.waitKey(1)
        time2 = time.time()
        print(image.shape[:2], int(frame_count/(time2-time1)))
        frame_count += 1
    except:
        print(traceback.format_exc())
        break
Last modification:December 20th, 2021 at 11:02 am