文章目录

上位机数据采集的缓存与压缩传输

发布于 2026-03-26 19:21:15 · 浏览 8 次 · 评论 0 条

上位机数据采集的缓存与压缩传输

高频数据采集场景下,若将每个采样点的数据立即通过网络发送至上位机或服务器,极易造成网络阻塞、丢包或数据库写入崩溃。通过在本地建立缓存机制,并配合数据压缩算法,能显著降低网络负载,提升系统稳定性。


整体架构设计

实施缓存与压缩传输的核心在于“攒批发货”:将高频的离散数据先暂存于内存,当达到一定数量或时间阈值后,打包压缩再一次性发送。

graph LR A[PLC/传感器] -->|实时数据| B["缓存区: Ring Buffer"] B -->|触发条件
数量/时间| C["压缩器: Gzip/Snappy"] C -->|二进制流| D["网络发送: TCP/MQTT"] D --> E[服务器解压入库]

第一步:构建本地内存缓存

使用环形缓冲区是最佳选择,它能以固定的内存空间循环写入数据,避免频繁的内存分配与释放。

  1. 定义 缓冲区结构体。
    在代码中创建一个类,用于管理数据队列。以下以 C# 为例,利用 Queue 或自行实现的循环数组。

    打开 你的上位机项目,新建一个名为 DataBuffer.cs 的文件。

  2. 配置 缓冲区参数。
    需设定两个核心参数:BufferSize(最大容量)和 TriggerThreshold(触发发送的阈值)。

    • BufferSize:建议设为预计每秒数据量的 5-10 倍。例如,每秒 1000 条,设为 5000 或 10000。
    • TriggerThreshold:建议设为 BufferSize 的 80% 或根据最大传输单元(MTU)反推,例如每 2000 条触发一次。
  3. 编写 核心入队逻辑。
    以下代码展示了一个基本的线程安全缓冲区实现:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

public class DataBuffer
{
    // 使用线程安全队列
    private readonly ConcurrentQueue<string> _buffer;

    // 触发发送的阈值
    public int TriggerThreshold { get; set; } = 2000;

    public DataBuffer()
    {
        _buffer = new ConcurrentQueue<string>();
    }

    // 写入数据
    public void Enqueue(string dataItem)
    {
        _buffer.Enqueue(dataItem);

        // 检查是否达到触发条件
        if (_buffer.Count >= TriggerThreshold)
        {
            OnBufferFull?.Invoke();
        }
    }

    // 批量取出并清空
    public List<string> DequeueAll()
    {
        var list = new List<string>();
        while (_buffer.TryDequeue(out string item))
        {
            list.Add(item);
        }
        return list;
    }

    // 事件:当缓冲区满时触发
    public event Action OnBufferFull;
}

第二步:选择并实施压缩算法

数据在网络传输前必须序列化为字节数组。对于时序数据(如温度、压力),使用通用压缩算法(如 Gzip 或 Deflate)通常能获得 50% - 80% 的压缩率。

  1. 引入 命名空间。
    在代码头部 添加 using System.IO;using System.IO.Compression;

  2. 编写 压缩辅助函数。
    创建一个静态类 CompressionHelper,提供字符串列表到压缩字节数组的转换功能。

public static class CompressionHelper
{
    // 将字符串列表压缩为字节数组
    public static byte[] Compress(List<string> data)
    {
        using (var output = new MemoryStream())
        {
            // 使用 Gzip 流(比 Deflate 压缩率稍高,兼容性好)
            using (var gzipStream = new GZipStream(output, CompressionLevel.Optimal))
            {
                // 1. 先将数据转为特定格式的字符串(如 JSON 或 CSV)
                string rawData = string.Join("\n", data);

                // 2. 将字符串转为字节
                byte[] rawBytes = System.Text.Encoding.UTF8.GetBytes(rawData);

                // 3. 写入压缩流
                gzipStream.Write(rawBytes, 0, rawBytes.Length);
            }
            // 必须在流关闭后才能获取最终字节
            return output.ToArray();
        }
    }
}

第三步:集成发送逻辑

将缓存取出的数据经过压缩,通过 Socket 或 MQTT 发送。

  1. 绑定 缓冲区触发事件。
    在主程序初始化部分,实例化 缓冲区对象,并订阅其 OnBufferFull 事件。
// 初始化
var buffer = new DataBuffer();
buffer.OnBufferFull += () => 
{
    // 1. 从缓冲区取出所有数据
    var dataToSend = buffer.DequeueAll();

    // 2. 执行压缩
    byte[] compressedData = CompressionHelper.Compress(dataToSend);

    // 3. 发送数据
    SendToServer(compressedData);
};
  1. 实现 发送函数。
    根据实际使用的协议(TCP/WebSocket/MQTT)发送字节数组。以下为 TCP Socket 发送示例:
public void SendToServer(byte[] data)
{
    try
    {
        if (socket != null && socket.Connected)
        {
            // **发送** 压缩后的二进制流
            socket.Send(data);
        }
    }
    catch (Exception ex)
    {
        // 记录错误日志
        Console.WriteLine($"发送失败: {ex.Message}");
        // 发送失败应考虑重发机制,将数据重新放回队列
    }
}

第四步:优化策略与防堵塞处理

仅凭上述代码,在数据量极低时可能长时间达不到发送阈值,导致数据延迟;在网络故障时会导致内存溢出。

  1. 启用 定时器发送。
    除了“数量阈值”,必须增加“时间阈值”。

    添加 一个 System.Timers.Timer,间隔设为 1000ms(1秒)。

Timer timer = new Timer(1000);
timer.Elapsed += (sender, e) =>
{
    if (buffer.Count > 0)
    {
        var dataToSend = buffer.DequeueAll();
        byte[] compressedData = CompressionHelper.Compress(dataToSend);
        SendToServer(compressedData);
    }
};
timer.Start();
  1. 实施 丢弃策略。
    当网络断开且缓冲区持续增长,超过 BufferSize 的 90% 时,必须 丢弃 最旧的数据或阻止新数据写入,防止上位机内存溢出(OOM)。

    DataBufferEnqueue 方法中 修改 逻辑:

public void Enqueue(string dataItem)
{
    // 如果缓冲区过满,强制丢弃头部最旧的数据
    if (_buffer.Count >= BufferSize)
    {
        _buffer.TryDequeue(out _);
    }

    _buffer.Enqueue(dataItem);

    // ...其余触发逻辑
}

效能对比

实施上述方案前后的数据对比如下:

指标 优化前 (直发 JSON) 优化后 (缓存+Gzip)
网络包频率 1000 包/秒 1 包/秒 (攒批发送)
单包大小 ~200 字节 ~10-20 千字节
网络带宽占用 高 (含大量 HTTP/TCP 头) 低 (二进制流)
CPU 占用率 低 (频繁序列化) 中 (定时批量压缩)
系统稳定性 差 (易丢包/堵塞)

通过内存缓存攒批结合二进制压缩传输,上位机系统的网络吞吐效率可提升 10 倍以上,且能平稳应对突发流量高峰。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文