Skip to content

Commit

Permalink
[feat]新增IOwnerPacket,完成内存所有权转移的梳理
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Sep 11, 2024
1 parent 335140b commit 8f73218
Show file tree
Hide file tree
Showing 22 changed files with 154 additions and 71 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/publish-beta.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: publish-beta

on:
push:
branches: [ master, dev, v10 ]
branches: [ master, dev ]
paths:
- 'NewLife.Core/**'
pull_request:
branches: [ master, dev, v10 ]
branches: [ master, dev ]
paths:
- 'NewLife.Core/**'
workflow_dispatch:
Expand Down
4 changes: 4 additions & 0 deletions NewLife.Core/Data/DbTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ public void WriteData(Binary bn, Int32[] fields)
/// <returns></returns>
public IPacket ToPacket()
{
// 不确定所需大小,只能使用内存流,再包装为数据包。
// 头部预留8个字节,方便网络传输时添加协议头。
var ms = new MemoryStream
{
Position = 8
Expand All @@ -423,6 +425,8 @@ public IPacket ToPacket()
Write(ms);

ms.Position = 8;

// 包装为数据包,直接窃取内存流内部的缓冲区
return new ArrayPacket(ms);
}

Expand Down
47 changes: 31 additions & 16 deletions NewLife.Core/Data/IPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public interface IPacket
IPacket Slice(Int32 offset, Int32 count = -1);
}

/// <summary>拥有管理权的数据包。使用完以后需要释放</summary>
public interface IOwnerPacket : IPacket, IDisposable
{
/// <summary>是否拥有管理权</summary>
Boolean HasOwner { get; set; }
}

/// <summary>内存包辅助类</summary>
public static class PacketHelper
{
Expand Down Expand Up @@ -247,7 +254,7 @@ public static Byte[] ReadBytes(this IPacket pk, Int32 offset = 0, Int32 count =
/// <remarks>
/// 使用时务必明确所有权归属,用完后及时释放。
/// </remarks>
public struct OwnerPacket : IDisposable, IPacket
public struct OwnerPacket : IDisposable, IPacket, IOwnerPacket
{
#region 属性
private IMemoryOwner<Byte> _owner;
Expand Down Expand Up @@ -427,7 +434,7 @@ public IPacket Slice(Int32 offset, Int32 count)
}

/// <summary>字节数组包</summary>
public struct ArrayPacket : IDisposable, IPacket
public struct ArrayPacket : IDisposable, IPacket, IOwnerPacket
{
#region 属性
private Byte[] _buffer;
Expand Down Expand Up @@ -455,6 +462,9 @@ public struct ArrayPacket : IDisposable, IPacket

/// <summary>总长度</summary>
public Int32 Total => Length + (Next?.Total ?? 0);

/// <summary>空数组</summary>
public static ArrayPacket Empty = new([]);
#endregion

#region 索引
Expand Down Expand Up @@ -504,6 +514,7 @@ public Byte this[Int32 index]
}
#endregion

#region 构造
/// <summary>通过指定字节数组来实例化数据包</summary>
/// <param name="buf"></param>
/// <param name="offset"></param>
Expand Down Expand Up @@ -563,11 +574,8 @@ public ArrayPacket(Stream stream)
#endif
}

//Set(stream.ToArray());

var buf = new Byte[stream.Length - stream.Position];
var count = stream.Read(buf, 0, buf.Length);
//Set(buf, 0, count);
_buffer = buf;
_offset = 0;
_length = count;
Expand All @@ -587,6 +595,7 @@ public void Dispose()
HasOwner = false;
}
}
#endregion

/// <summary>获取分片包。在管理权生命周期内短暂使用</summary>
/// <returns></returns>
Expand All @@ -603,16 +612,18 @@ public void Dispose()
/// </remarks>
/// <param name="offset">偏移</param>
/// <param name="count">个数。默认-1表示到末尾</param>
public IPacket Slice(Int32 offset, Int32 count)
{
//var remain = _length - offset;
//if (count < 0 || count > remain) count = remain;
//if (offset == 0 && count == _length) return this;
public ArrayPacket Slice(Int32 offset, Int32 count) => (ArrayPacket)(this as IPacket).Slice(offset, count);

//var pk = new ArrayPacket(_buffer, _offset + offset, count) { HasOwner = HasOwner };
//HasOwner = false;

//return pk;
/// <summary>切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用</summary>
/// <remarks>
/// 可能是引用同一块内存,也可能是新的内存。
/// 可能就是当前数据包,也可能引用相同的所有者或数组。
/// </remarks>
/// <param name="offset">偏移</param>
/// <param name="count">个数。默认-1表示到末尾</param>
IPacket IPacket.Slice(Int32 offset, Int32 count)
{
if (count == 0) return Empty;

IPacket? pk = null;
var start = Offset + offset;
Expand Down Expand Up @@ -645,8 +656,12 @@ public IPacket Slice(Int32 offset, Int32 count)
pk = new ArrayPacket(_buffer, start, remain) { Next = Next.Slice(0, count - remain) };
}

if (pk is ArrayPacket ap) ap.HasOwner = HasOwner;
HasOwner = false;
// 所有权转移
if (pk is ArrayPacket ap && ap._buffer == _buffer)
{
ap.HasOwner = HasOwner;
HasOwner = false;
}

return pk;
}
Expand Down
18 changes: 13 additions & 5 deletions NewLife.Core/Http/HttpBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace NewLife.Http;

/// <summary>Http请求响应基类</summary>
public abstract class HttpBase
public abstract class HttpBase : IDisposable
{
#region 属性
/// <summary>协议版本</summary>
Expand Down Expand Up @@ -36,6 +36,11 @@ public abstract class HttpBase
public String this[String key] { get => Headers[key] + ""; set => Headers[key] = value; }
#endregion

#region 构造
/// <summary>释放</summary>
public void Dispose() => Body.TryDispose();
#endregion

#region 解析
/// <summary>快速验证协议头,剔除非HTTP协议。仅排除,验证通过不一定就是HTTP协议</summary>
/// <param name="data"></param>
Expand All @@ -50,7 +55,7 @@ public static Boolean FastValidHeader(ReadOnlySpan<Byte> data)

private static readonly Byte[] NewLine = [(Byte)'\r', (Byte)'\n'];
private static readonly Byte[] NewLine2 = [(Byte)'\r', (Byte)'\n', (Byte)'\r', (Byte)'\n'];
/// <summary>分析请求头</summary>
/// <summary>分析请求头。截取Body时获取缓冲区所有权</summary>
/// <param name="pk"></param>
/// <returns></returns>
public Boolean Parse(IPacket pk)
Expand Down Expand Up @@ -90,7 +95,7 @@ public Boolean Parse(IPacket pk)

//var str = pk.ReadBytes(0, p).ToStr();

// 截取
// 截取主体,获取所有权
//var lines = str.Split("\r\n");
Body = pk.Slice(p + 4);

Expand Down Expand Up @@ -118,14 +123,17 @@ public Boolean Parse(IPacket pk)

#region 读写
/// <summary>创建请求响应包</summary>
/// <remarks>数据来自缓冲池,使用者用完返回数据包后应该释放,以便把缓冲区放回池里</remarks>
/// <returns></returns>
public virtual IPacket Build()
public virtual IOwnerPacket Build()
{
var body = Body;
var len = body != null ? body.Total : 0;

var header = BuildHeader(len);
var pk = new ArrayPacket(Encoding.UTF8.GetByteCount(header) + len);

// 从内存池申请缓冲区,Slice后管理权转移,外部使用完以后释放
using var pk = new ArrayPacket(Encoding.UTF8.GetByteCount(header) + len);
var writer = new SpanWriter(pk.GetSpan());

//BuildHeader(writer, len);
Expand Down
2 changes: 1 addition & 1 deletion NewLife.Core/Http/HttpResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected override Boolean OnParse(String firstLine)

/// <summary>创建请求响应包</summary>
/// <returns></returns>
public override IPacket Build()
public override IOwnerPacket Build()
{
// 如果响应异常,则使用响应描述作为内容
if (StatusCode > HttpStatusCode.OK && Body == null && !StatusDescription.IsNullOrEmpty())
Expand Down
19 changes: 17 additions & 2 deletions NewLife.Core/Http/HttpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ public void Process(IData data)
if (req.ContentLength > MaxRequestLength)
{
var rs = new HttpResponse { StatusCode = HttpStatusCode.RequestEntityTooLarge };
_session.Send(rs.Build());

// 发送响应。用完后释放数据包,还给缓冲池
using var res = rs.Build();
_session.Send(res);
_session.Dispose();

return;
Expand All @@ -91,6 +93,10 @@ public void Process(IData data)
_cache = new MemoryStream(req.ContentLength);
req.Body?.CopyTo(_cache);
//req.Body = req.Body.Clone();

// 请求主体数据来自缓冲区,要还回去
req.Body.TryDispose();
req.Body = null;
}
}
else if (req != null)
Expand Down Expand Up @@ -130,11 +136,20 @@ public void Process(IData data)
var closing = !req.KeepAlive && _websocket == null;
if (closing && !rs.Headers.ContainsKey("Connection")) rs.Headers["Connection"] = "close";

_session.Send(rs.Build());
// 发送响应。用完后释放数据包,还给缓冲池
using var res = rs.Build();
_session.Send(res);

if (closing) _session.Dispose();
}
}

// 请求主体数据来自缓冲区,要还回去
if (req != null)
{
req.Body.TryDispose();
req.Body = null;
}
}

/// <summary>收到新的Http请求,只有头部</summary>
Expand Down
21 changes: 16 additions & 5 deletions NewLife.Core/Http/TinyHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ protected virtual async Task<Stream> GetStreamAsync(Uri? uri)
/// <param name="uri"></param>
/// <param name="request"></param>
/// <returns></returns>
protected virtual async Task<IPacket> SendDataAsync(Uri? uri, IPacket? request)
protected virtual async Task<IOwnerPacket> SendDataAsync(Uri? uri, IPacket? request)
{
var ns = await GetStreamAsync(uri).ConfigureAwait(false);

// 发送
if (request != null) await request.CopyToAsync(ns).ConfigureAwait(false);

// 接收
var pk = new ArrayPacket(BufferSize);
using var pk = new ArrayPacket(BufferSize);
using var source = new CancellationTokenSource(Timeout);

#if NETCOREAPP || NETSTANDARD2_1
Expand All @@ -162,11 +162,11 @@ protected virtual async Task<IPacket> SendDataAsync(Uri? uri, IPacket? request)
while (retry-- > 0)
{
// 发出请求
rs = await SendDataAsync(uri, req).ConfigureAwait(false);
if (rs == null || rs.Length == 0) return null;
using var rs2 = await SendDataAsync(uri, req).ConfigureAwait(false);
if (rs2 == null || rs2.Length == 0) return null;

// 解析响应
if (!res.Parse(rs)) return res;
if (!res.Parse(rs2)) return res;
rs = res.Body;

// 跳转
Expand All @@ -181,6 +181,8 @@ protected virtual async Task<IPacket> SendDataAsync(Uri? uri, IPacket? request)

uri = uri2;
request.RequestUri = uri;

req.TryDispose();
req = request.Build();

continue;
Expand All @@ -190,6 +192,9 @@ protected virtual async Task<IPacket> SendDataAsync(Uri? uri, IPacket? request)
break;
}

// 释放数据包,还给缓冲池
req.TryDispose();

if (res.StatusCode != HttpStatusCode.OK) throw new Exception($"{(Int32)res.StatusCode} {res.StatusDescription}");

// 如果没有收完数据包
Expand All @@ -199,6 +204,7 @@ protected virtual async Task<IPacket> SendDataAsync(Uri? uri, IPacket? request)
var last = rs;
while (total < res.ContentLength)
{
//todo 这里的IPacket.Append可能有问题,因为last本质上是结构体
var pk = await SendDataAsync(null, null).ConfigureAwait(false);
last.Append(pk);

Expand All @@ -212,7 +218,10 @@ protected virtual async Task<IPacket> SendDataAsync(Uri? uri, IPacket? request)
{
// 如果不足则读取一个chunk,因为有可能第一个响应包只有头部
if (rs.Length == 0)
{
rs.TryDispose();
rs = await SendDataAsync(null, null).ConfigureAwait(false);
}

res.Body = await ReadChunkAsync(rs);
}
Expand Down Expand Up @@ -285,6 +294,8 @@ protected virtual async Task<IPacket> ReadChunkAsync(IPacket body)
if (pk == null || pk.Length == 0) break;
if (pk.Length > 0) continue;

pk.TryDispose();

// 读取新的数据片段,如果不存在则跳出
pk = await SendDataAsync(null, null).ConfigureAwait(false);
if (pk == null || pk.Length == 0) break;
Expand Down
6 changes: 3 additions & 3 deletions NewLife.Core/Http/WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class WebSocket
/// <param name="pk"></param>
public void Process(IPacket pk)
{
var message = new WebSocketMessage();
using var message = new WebSocketMessage();
if (message.Read(pk))
{
ActiveTime = DateTime.Now;
Expand Down Expand Up @@ -110,7 +110,7 @@ private void Send(WebSocketMessage msg)
var socket = Context?.Socket;
if (session == null && socket == null) throw new ObjectDisposedException(nameof(Context));

var data = msg.ToPacket();
using var data = msg.ToPacket();
if (session != null)
session.Send(data);
else
Expand Down Expand Up @@ -138,7 +138,7 @@ public void SendAll(IPacket data, WebSocketMessageType type, Func<INetSession, B
{
var session = (Context?.Connection) ?? throw new ObjectDisposedException(nameof(Context));
var msg = new WebSocketMessage { Type = type, Payload = data };
var data2 = msg.ToPacket();
using var data2 = msg.ToPacket();
session.Host.SendAllAsync(data2, predicate);
}

Expand Down
Loading

0 comments on commit 8f73218

Please sign in to comment.