无人机实战系列(三)本地摄像头+远程GPU转换深度图

news/2025/2/27 5:02:54

这篇文章将结合之前写的两篇文章 无人机实战系列(一)在局域网内传输数据 和 无人机实战系列(二)本地摄像头 + Depth-Anything V2 实现了以下功能:

  • 本地笔记本摄像头发布图像 + 远程GPU实时处理(无回传);
  • 【异步】本地笔记本摄像头发布图像 + 远程GPU实时处理(回传至笔记本并展示);
  • 【同步】本地笔记本摄像头发布图像 + 远程GPU实时处理(回传至笔记本并展示);

建议在运行这个demo之前先查看先前的两篇文章以熟悉 zmq 库与 Depth-Anything V2 这个模型;

这里之所以提供两个在是否回传上的demo是因为你需要根据自己的实际状况进行选择,尽管回传并显示深度图能够更加直观查看计算结果但你仍然需要平衡以下两个方面:

  • 深度本身体积较大,回传会占用通讯带宽;
  • 回传的图像本地显示会占用本地算力;

【注意】:这篇文章的代码需要在 无人机实战系列(二)本地摄像头 + Depth-Anything V2 中的文件夹下运行,否则会报错找不到对应的文件与模型。


本地笔记本摄像头发布图像 + 远程GPU实时处理(无回传)

这个demo实现了本地笔记本打开摄像头后将图像发布出去,远程GPU服务器接受到图像后使用 Depth-Anything V2 处理图像并展示。

本地笔记本发布摄像头图像

在下面的代码中有以下几点需要注意:

  1. 设置发布频率 send_fps,较低的发布频率可以让减少GPU端的压力;
  2. 设置发布队列大小 socket.setsockopt(zmq.SNDHWM, 1),让发布队列始终仅有当前帧画面,降低带宽压力;
  3. 设置仅保存最新消息 socket.setsockopt(zmq.CONFLATE, 1),让发布队列仅存储最新的画面,降低接受端的画面延迟;
python">import zmq
import cv2
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")  # 本地绑定端口

socket.setsockopt(zmq.SNDHWM, 1)        # 发送队列大小为1
socket.setsockopt(zmq.CONFLATE, 1)      # 仅保存最新消息

cap = cv2.VideoCapture(0)  # 读取摄像头

send_fps = 30       # 限制传输的fps,降低接受方的处理压力

while True:
    start_time = time.time()
    ret, frame = cap.read()
    if not ret:
        continue
    _, buffer = cv2.imencode('.jpg', frame)  # 编码成JPEG格式
    socket.send(buffer.tobytes())  # 发送图像数据

    cv2.imshow("Origin image", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    
    time.sleep(max(1/send_fps - (time.time() - start_time), 0))

运行:

$ python camera_pub.py

在这里插入图片描述

GPU 服务器接受端

在下面的代码中有以下几点需要注意:

  1. 绑定发布端地址 socket.connect("tcp://192.168.75.201:5555"),根据你笔记本的地址进行修改;
  2. 仅接受最新消息 socket.setsockopt(zmq.CONFLATE, 1)
  3. 清空旧数据帧 socket.setsockopt(zmq.RCVHWM, 1) && socket.poll(1)
python">import argparse
import cv2
import numpy as np
import torch
import time
import zmq

from depth_anything_v2.dpt import DepthAnythingV2

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://192.168.75.201:5555")		# 远程发布端地址
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.setsockopt(zmq.CONFLATE, 1)      # 仅接受最新消息
socket.setsockopt(zmq.RCVHWM, 1)        # 清空旧数据帧

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Depth Anything V2')
    parser.add_argument('--input-size', type=int, default=518)
    parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])
    parser.add_argument('--pred-only', action='store_true', help='only display the prediction')
    parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')
    
    args = parser.parse_args()

    DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    model_configs = {
        'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
        'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},
        'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},
        'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}
    }
    
    depth_anything = DepthAnythingV2(**model_configs[args.encoder])
    depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))
    depth_anything = depth_anything.to(DEVICE).eval()

    margin_width = 50
    
    while True:
        start_time = time.time()
        
        # **优化 1: ZMQ 数据接收**
        try:
            while socket.poll(1):  # 尝试不断读取新数据,丢弃旧数据
                msg = socket.recv(zmq.NOBLOCK)
            zmq_time = time.time()
            
            # **优化 2: OpenCV 解码**
            raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)
            decode_time = time.time()

            # **优化 3: 模型推理**
            with torch.no_grad():
                depth = depth_anything.infer_image(raw_frame, args.input_size)
            infer_time = time.time()

            # **优化 4: 归一化 + OpenCV 伪彩色映射**
            depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)
            if args.grayscale:
                depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)
            else:
                depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)
            process_time = time.time()

            # **优化 5: 合并图像**
            split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255
            combined_frame = cv2.hconcat([raw_frame, split_region, depth])
            cv2.imshow('Raw Frame and Depth Prediction', combined_frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

            print(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")
            print(f"  ZMQ receive:  {zmq_time - start_time:.4f} s")
            print(f"  Decode:       {decode_time - zmq_time:.4f} s")
            print(f"  Inference:    {infer_time - decode_time:.4f} s")
            print(f"  Processing:   {process_time - infer_time:.4f} s")

        except zmq.Again:
            print("No msg received, skip...")
            continue  # 没有消息就跳过

    cv2.destroyAllWindows()

运行:

python">$ python camera_recv.py

在这里插入图片描述


【异步】本地笔记本摄像头发布图像 + 远程GPU实时处理(回传至笔记本并展示)

和上面的代码基本一致,只不过在发送与接收端都增加了一个收发对象,通常情况下使用异步方式处理收发因为可以避免一端服务来不及处理而导致另一端持续等待。

本地笔记本发布摄像头图像

python">import zmq
import cv2
import numpy as np
import time

context = zmq.Context()

# 发布原始数据
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://*:5555")  # 发布数据

# 接收处理结果
pull_socket = context.socket(zmq.PULL)
pull_socket.bind("tcp://*:5556")  # 监听处理方返回数据

send_fps = 30

cap = cv2.VideoCapture(0)

while True:
    start_time = time.time()
    ret, frame = cap.read()
    if not ret:
        continue

    # [可选] 图像降采样
    frame = cv2.pyrDown(frame)
    frame = cv2.pyrDown(frame)

    _, buffer = cv2.imencode('.jpg', frame)  # 压缩图像
    pub_socket.send(buffer.tobytes())  # 发布数据

    # 非阻塞接收处理结果
    try:
        processed_data = pull_socket.recv(zmq.NOBLOCK)
        processed_frame = cv2.imdecode(np.frombuffer(processed_data, dtype=np.uint8), 1)
    except zmq.Again:
        print("No image received, continue...")
        continue

    cv2.imshow("Processed Frame", processed_frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

    time.sleep(max(1/send_fps - (time.time() - start_time), 0))

cv2.destroyAllWindows()

运行:

python">$ python camera_pub_async.py

在这里插入图片描述

GPU 服务器接受端(异步)

python">import argparse
import cv2
import numpy as np
import torch
import time
import zmq

from depth_anything_v2.dpt import DepthAnythingV2

context = zmq.Context()
sub_socket = context.socket(zmq.SUB)
sub_socket.connect("tcp://192.168.75.201:5555")
sub_socket.setsockopt(zmq.SUBSCRIBE, b"")
sub_socket.setsockopt(zmq.CONFLATE, 1)      # 仅接受最新消息
sub_socket.setsockopt(zmq.RCVHWM, 1)        # 清空旧数据帧


# 发送处理结果
push_socket = context.socket(zmq.PUSH)
push_socket.connect("tcp://192.168.75.201:5556")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Depth Anything V2')
    parser.add_argument('--input-size', type=int, default=518)
    parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])
    parser.add_argument('--pred-only', action='store_true', help='only display the prediction')
    parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')
    
    args = parser.parse_args()

    DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    model_configs = {
        'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
        'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},
        'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},
        'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}
    }
    
    depth_anything = DepthAnythingV2(**model_configs[args.encoder])
    depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))
    depth_anything = depth_anything.to(DEVICE).eval()

    margin_width = 50
    
    while True:
        start_time = time.time()
        
        # **优化 1: ZMQ 数据接收**
        try:
            while sub_socket.poll(1):  # 尝试不断读取新数据,丢弃旧数据
                msg = sub_socket.recv(zmq.NOBLOCK)
            msg = sub_socket.recv()
            zmq_time = time.time()
            
            # **优化 2: OpenCV 解码**
            raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)
            decode_time = time.time()

            # **优化 3: 模型推理**
            with torch.no_grad():
                depth = depth_anything.infer_image(raw_frame, args.input_size)
            infer_time = time.time()

            # **优化 4: 归一化 + OpenCV 伪彩色映射**
            depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)
            if args.grayscale:
                depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)
            else:
                depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)
            process_time = time.time()

            # **优化 5: 合并图像**
            split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255
            combined_frame = cv2.hconcat([raw_frame, split_region, depth])
            cv2.imshow('Raw Frame and Depth Prediction', combined_frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

            print(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")
            print(f"  ZMQ receive:  {zmq_time - start_time:.4f} s")
            print(f"  Decode:       {decode_time - zmq_time:.4f} s")
            print(f"  Inference:    {infer_time - decode_time:.4f} s")
            print(f"  Processing:   {process_time - infer_time:.4f} s")

            _, buffer = cv2.imencode('.jpg', combined_frame)
            push_socket.send(buffer.tobytes())  # 发送回处理结果

        except zmq.Again:
            print("No msg received, skip...")
            continue  # 没有消息就跳过


    cv2.destroyAllWindows()

运行:

$ python camera_recv_async.py

在这里插入图片描述


【同步】本地笔记本摄像头发布图像 + 远程GPU实时处理(回传至笔记本并展示)

通常情况下这种视频流的传递不会考虑同步方式,因为这需要发布方与接收端保持一致,对网络稳定性有较高的要求。

本地笔记本发布摄像头图像

这个demo需要注意以下几点:

  1. 设置发送端为请求响应模式 context.socket(zmq.REQ)
  2. 阻塞等待服务器回传数据 pub_socket.recv()
python">import zmq
import cv2
import numpy as np
import time

context = zmq.Context()

# 发布原始数据
pub_socket = context.socket(zmq.REQ)        # 使用请求响应模式
pub_socket.bind("tcp://*:5555")  # 发布数据

send_fps = 30
cap = cv2.VideoCapture(0)

while True:
    start_time = time.time()
    ret, frame = cap.read()
    if not ret:
        continue

    # [可选] 图像降采样
    frame = cv2.pyrDown(frame)
    frame = cv2.pyrDown(frame)
    try:
        _, buffer = cv2.imencode('.jpg', frame)  # 压缩图像
        pub_socket.send(buffer.tobytes())  # 发布数据
        print("Waitting for server processed.")
        processed_data = pub_socket.recv()
        processed_frame = cv2.imdecode(np.frombuffer(processed_data, dtype=np.uint8), 1)
    except zmq.Again:
        print("No image received, continue...")
        continue

    cv2.imshow("Processed Frame", processed_frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

    time.sleep(max(1/send_fps - (time.time() - start_time), 0))

cv2.destroyAllWindows()

运行:

$ python camera_pub_sync.py

在这里插入图片描述

GPU 服务器接受端

这个demo需要注意以下几点:

  1. 设置接受端为请求响应模式 context.socket(zmq.REP)
  2. 阻塞接受发布端数据 sub_socket.recv()
  3. 将处理好的数据进行同步回传 sub_socket.send(buffer.tobytes())
python">import argparse
import cv2
import numpy as np
import torch
import time
import zmq

from depth_anything_v2.dpt import DepthAnythingV2

context = zmq.Context()
sub_socket = context.socket(zmq.REP)
sub_socket.connect("tcp://192.168.75.201:5555")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Depth Anything V2')
    parser.add_argument('--input-size', type=int, default=518)
    parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])
    parser.add_argument('--pred-only', action='store_true', help='only display the prediction')
    parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')
    
    args = parser.parse_args()

    DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    model_configs = {
        'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
        'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},
        'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},
        'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}
    }
    
    depth_anything = DepthAnythingV2(**model_configs[args.encoder])
    depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))
    depth_anything = depth_anything.to(DEVICE).eval()

    margin_width = 50
    
    while True:
        start_time = time.time()
        
        # **优化 1: ZMQ 数据接收**
        try:
            msg = sub_socket.recv()
            zmq_time = time.time()
            
            # **优化 2: OpenCV 解码**
            raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)
            decode_time = time.time()

            # **优化 3: 模型推理**
            with torch.no_grad():
                depth = depth_anything.infer_image(raw_frame, args.input_size)
            infer_time = time.time()

            # **优化 4: 归一化 + OpenCV 伪彩色映射**
            depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)
            if args.grayscale:
                depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)
            else:
                depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)
            process_time = time.time()

            # **优化 5: 合并图像**
            split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255
            combined_frame = cv2.hconcat([raw_frame, split_region, depth])
            cv2.imshow('Raw Frame and Depth Prediction', combined_frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

            print(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")
            print(f"  ZMQ receive:  {zmq_time - start_time:.4f} s")
            print(f"  Decode:       {decode_time - zmq_time:.4f} s")
            print(f"  Inference:    {infer_time - decode_time:.4f} s")
            print(f"  Processing:   {process_time - infer_time:.4f} s")

            _, buffer = cv2.imencode('.jpg', combined_frame)
            sub_socket.send(buffer.tobytes())  # 发送回处理结果

        except zmq.Again:
            print("No msg received, skip...")
            continue  # 没有消息就跳过


    cv2.destroyAllWindows()

运行:

$ python camera_recv_sync.py

在这里插入图片描述


http://www.niftyadmin.cn/n/5869459.html

相关文章

labview实现有符号位16进制转二进制补码转真值

今天在用一个采集模块时,发现读出寄存器的数据是不同的,它有两种范围,一个时十六进制整型,一种是有符号位十六进制,对应的量程和范围也是不同的,针对之前读取温度没有出现负数的情况,应该是转成…

批量导出数据库表到Excel

这篇文章将介绍如何批量的将多个甚至成千上万的数据库表导出为Excel文件。 准备数据 如下图是数据库里的表,我们需要将它们全部导出为excel文件,这里以SQL Server数据库为例 新增导出 打开的卢导表工具,新建数据库连接,这里以S…

链表3(LinkedList)

1、双向不带头链表的实现 1.1 节点成员和构造方法 双向不带头链表相比于单向多了一个prev域,它能使链表获得前驱节点。 如上图是双向不带头链表的一个节点,它可以直接找到前驱和后继节点。 由上面的讲解可得到代码:(注意&#xf…

获取GitHub的OAuth2的ClientId和ClientSecrets

获取 GitHub OAuth2 登录所需的 client-id 和 client-secret 登录 GitHub:使用你的 GitHub 账号登录到 GitHub。访问开发者设置:点击右上角的头像,选择 Settings,然后在左侧导航栏中选择 Developer settings。创建新的 OAuth 应用…

被裁20240927 --- WSL-Ubuntu20.04安装cuda、cuDNN、tensorRT

cuda、cuDNN、tensorRT的使用场景 1. CUDA(Compute Unified Device Architecture) 作用: GPU 通用计算:CUDA 是 NVIDIA 的并行计算平台和编程模型,允许开发者直接利用 GPU 的并行计算能力,加速通用计算任…

springcloud nacos 整合seata解决分布式事务

文章目录 nacos安装Mysql5.7安装及表初始化seata server安装下载并解压seata安装包在conf文件夹修改file.conf文件向本地数据库导入seata需要的表修改registry.conf文件将seata配置信息添加到nacos配置中心启动seata server springcloud整合seata测试流程正常下单流程扣减库存失…

学习路程四 向量数据库Milvus安装与连接

前序 在之前,已经简单完成了文档的加载,分割,向量化这些步骤,最后得到了结果。但是这些数据都是一次性的。假设一个律师所,有几千上万份卷宗,不可能每次使用都重新向量化数据吧。 所以我们需要有一个地方存…

LangChain系列:精通LangChain的合并文档链

LangChain的合并链旨在解决语言模型处理长文本时的上下文限制问题,包含Stuff、MapReduce、Refine和Rerank四种策略。Stuff链通过简单拼接文档块实现快速处理,适用于短文本但受限于模型token容量;MapReduce链采用分治思想,先独立处…