Skip to content

Commit

Permalink
增强OwnerPacket作为具有内存所有权的主力实现
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Sep 12, 2024
1 parent eb16d39 commit 9e99cc0
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 92 deletions.
121 changes: 72 additions & 49 deletions NewLife.Core/Data/IPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public interface IPacket
/// <summary>拥有管理权的数据包。使用完以后需要释放</summary>
public interface IOwnerPacket : IPacket, IDisposable
{
/// <summary>是否拥有管理权</summary>
Boolean HasOwner { get; set; }
///// <summary>是否拥有管理权</summary>
//Boolean HasOwner { get; set; }
}

/// <summary>内存包辅助类</summary>
Expand All @@ -80,7 +80,7 @@ public static IPacket Append(this IPacket pk, IPacket next)
/// <param name="next"></param>
public static IPacket Append(this IPacket pk, Byte[] next) => Append(pk, new ArrayPacket(next));

/// <summary>转字符串并释放</summary>
/// <summary>转字符串</summary>
/// <param name="pk"></param>
/// <param name="encoding"></param>
/// <param name="offset"></param>
Expand All @@ -95,7 +95,7 @@ public static String ToStr(this IPacket pk, Encoding? encoding = null, Int32 off
if (span.Length > count) span = span[..count];

var rs = span.ToStr(encoding);
pk.TryDispose();
//pk.TryDispose();
return rs;
}

Expand All @@ -107,7 +107,7 @@ public static String ToStr(this IPacket pk, Encoding? encoding = null, Int32 off
if (span.Length > count) span = span[..count];

sb.Append(span.ToStr(encoding));
p.TryDispose();
//p.TryDispose();

count -= span.Length;
}
Expand Down Expand Up @@ -297,6 +297,8 @@ public static Boolean TryGetArray(this IPacket pk, out ArraySegment<Byte> segmen
{
if (pk.Next == null)
{
if (pk is OwnerPacket op && op.TryGetArray(out segment)) return true;

if (pk is ArrayPacket ap)
{
segment = new ArraySegment<Byte>(ap.Buffer, ap.Offset, ap.Length);
Expand All @@ -316,12 +318,16 @@ public static Boolean TryGetArray(this IPacket pk, out ArraySegment<Byte> segmen
/// <remarks>
/// 使用时务必明确所有权归属,用完后及时释放。
/// </remarks>
public struct OwnerPacket : IDisposable, IPacket, IOwnerPacket
public class OwnerPacket : MemoryManager<Byte>, IPacket, IOwnerPacket
{
#region 属性
private IMemoryOwner<Byte> _owner;
/// <summary>内存所有者</summary>
public IMemoryOwner<Byte> Owner => _owner;
private Byte[] _buffer;
/// <summary>缓冲区</summary>
public Byte[] Buffer => _buffer;

private readonly Int32 _offset;
/// <summary>数据偏移</summary>
public Int32 Offset => _offset;

private readonly Int32 _length;
/// <summary>数据长度</summary>
Expand All @@ -330,10 +336,7 @@ public struct OwnerPacket : IDisposable, IPacket, IOwnerPacket
/// <summary>获取/设置 指定位置的字节</summary>
/// <param name="index"></param>
/// <returns></returns>
public Byte this[Int32 index] { get => _owner.Memory.Span[index]; set => _owner.Memory.Span[index] = value; }

/// <summary>是否拥有管理权</summary>
public Boolean HasOwner { get; set; }
public Byte this[Int32 index] { get => _buffer[_offset + index]; set => _buffer[_offset + index] = value; }

/// <summary>下一个链式包</summary>
public IPacket? Next { get; set; }
Expand All @@ -342,6 +345,7 @@ public struct OwnerPacket : IDisposable, IPacket, IOwnerPacket
public Int32 Total => Length + (Next?.Total ?? 0);
#endregion

#region 构造
/// <summary>实例化指定长度的内存包,从共享内存池中借出</summary>
/// <param name="length">长度</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
Expand All @@ -350,46 +354,50 @@ public OwnerPacket(Int32 length)
if (length < 0)
throw new ArgumentOutOfRangeException(nameof(length), "Length must be non-negative and less than or equal to the memory owner's length.");

_owner = MemoryPool<Byte>.Shared.Rent(length);
_buffer = ArrayPool<Byte>.Shared.Rent(length);
_offset = 0;
_length = length;
HasOwner = true;
}

/// <summary>实例化内存包,指定内存所有者和长度</summary>
/// <param name="memoryOwner">内存所有者</param>
/// <param name="buffer">缓冲区</param>
/// <param name="offset"></param>
/// <param name="length">长度</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public OwnerPacket(IMemoryOwner<Byte> memoryOwner, Int32 length)
private OwnerPacket(Byte[] buffer, Int32 offset, Int32 length)
{
if (length < 0 || length > memoryOwner.Memory.Length)
if (offset < 0 || length < 0 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(length), "Length must be non-negative and less than or equal to the memory owner's length.");

_owner = memoryOwner;
_buffer = buffer;
_offset = offset;
_length = length;
}

/// <summary>释放</summary>
public void Dispose()
/// <summary>销毁释放</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)
{
//if (!HasOwner) throw new InvalidOperationException("Has not owner.");

var owner = _owner;
if (HasOwner && owner != null)
var buffer = _buffer;
if (buffer != null)
{
// 释放内存所有者以后,直接置空,避免重复使用
_owner = null!;
owner.Dispose();
HasOwner = false;
_buffer = null!;

ArrayPool<Byte>.Shared.Return(buffer);
}

Next.TryDispose();
}
#endregion

/// <summary>获取分片包。在管理权生命周期内短暂使用</summary>
/// <returns></returns>
public Span<Byte> GetSpan() => _owner.Memory.Span[.._length];
public override Span<Byte> GetSpan() => new(_buffer, _offset, _length);

/// <summary>获取内存包。在管理权生命周期内短暂使用</summary>
/// <returns></returns>
public Memory<Byte> GetMemory() => _owner.Memory[.._length];
public Memory<Byte> GetMemory() => new(_buffer, _offset, _length);

/// <summary>切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用</summary>
/// <remarks>
Expand All @@ -398,29 +406,47 @@ public void Dispose()
/// </remarks>
/// <param name="offset">偏移</param>
/// <param name="count">个数。默认-1表示到末尾</param>
public IPacket Slice(Int32 offset, Int32 count)
IPacket IPacket.Slice(Int32 offset, Int32 count) => Slice(offset, count);

/// <summary>切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用</summary>
/// <remarks>
/// 可能是引用同一块内存,也可能是新的内存。
/// 可能就是当前数据包,也可能引用相同的所有者或数组。
/// </remarks>
/// <param name="offset">偏移</param>
/// <param name="count">个数。默认-1表示到末尾</param>
public OwnerPacket Slice(Int32 offset, Int32 count)
{
var remain = _length - offset;
if (count < 0 || count > remain) count = remain;
if (offset == 0 && count == _length) return this;

if (offset == 0)
{
var pk = new OwnerPacket(_owner, count) { HasOwner = HasOwner };
HasOwner = false;
return pk;
}
return new OwnerPacket(_buffer, _offset + offset, count);
}

// 当前数据包可能会释放,必须拷贝数据
//return new MemoryPacket(_owner.Memory.Slice(offset, count), count);
var rs = new ArrayPacket(count);
GetSpan().CopyTo(rs.GetSpan());
return rs;
/// <summary>尝试获取数据段</summary>
/// <param name="segment"></param>
/// <returns></returns>
protected override Boolean TryGetArray(out ArraySegment<Byte> segment)
{
segment = new ArraySegment<Byte>(_buffer, _offset, _length);
return true;
}

/// <summary>钉住内存</summary>
/// <param name="elementIndex"></param>
/// <returns></returns>
/// <exception cref="NotSupportedException"></exception>
public override MemoryHandle Pin(Int32 elementIndex = 0) => throw new NotSupportedException();

/// <summary>取消钉内存</summary>
/// <exception cref="NotImplementedException"></exception>
public override void Unpin() => throw new NotImplementedException();

#region 重载运算符
/// <summary>已重载</summary>
/// <returns></returns>
public override String ToString() => $"[{_owner.Memory.Length}](0, {_length})" + (Next == null ? "" : $"<{Total}>");
public override String ToString() => $"[{_buffer.Length}]({_offset}, {_length})" + (Next == null ? "" : $"<{Total}>");
#endregion
}

/// <summary>内存包</summary>
Expand Down Expand Up @@ -511,11 +537,6 @@ public struct ArrayPacket : IDisposable, IPacket, IOwnerPacket
/// <summary>数据长度</summary>
public Int32 Length => _length;

///// <summary>获取/设置 指定位置的字节</summary>
///// <param name="index"></param>
///// <returns></returns>
//public Byte this[Int32 index] { get => _buffer[_offset + index]; set => _buffer[_offset + index] = value; }

/// <summary>是否拥有管理权。Dispose时,若有管理权则还给池里</summary>
public Boolean HasOwner { get; set; }

Expand Down Expand Up @@ -660,6 +681,8 @@ public void Dispose()
Pool.Shared.Return(buf);
HasOwner = false;
}

Next.TryDispose();
}
#endregion

Expand Down
16 changes: 5 additions & 11 deletions NewLife.Core/Http/HttpBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text;
using System.Buffers;
using System.Text;
using NewLife.Buffers;
using NewLife.Collections;
using NewLife.Data;
Expand Down Expand Up @@ -133,23 +134,16 @@ public virtual IOwnerPacket Build()
var header = BuildHeader(len);

// 从内存池申请缓冲区,Slice后管理权转移,外部使用完以后释放
using var pk = new ArrayPacket(Encoding.UTF8.GetByteCount(header) + len);
//using var pk = new ArrayPacket(Encoding.UTF8.GetByteCount(header) + len);
len += Encoding.UTF8.GetByteCount(header);
var pk = new OwnerPacket(len);
var writer = new SpanWriter(pk.GetSpan());

//BuildHeader(writer, len);
writer.Write(header, -1);

if (body != null) writer.Write(body.GetSpan());

return pk.Slice(0, writer.Position);

//var header = BuildHeader(len);
//var rs = new Packet(header.GetBytes())
//{
// Next = data
//};

//return rs;
}

/// <summary>创建头部</summary>
Expand Down
10 changes: 5 additions & 5 deletions NewLife.Core/Http/TinyHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected virtual async Task<IOwnerPacket> SendDataAsync(Uri? uri, IPacket? requ
if (request != null) await request.CopyToAsync(ns).ConfigureAwait(false);

// 接收
using var pk = new ArrayPacket(BufferSize);
var pk = new OwnerPacket(BufferSize);
using var source = new CancellationTokenSource(Timeout);

#if NETCOREAPP || NETSTANDARD2_1
Expand All @@ -162,7 +162,7 @@ protected virtual async Task<IOwnerPacket> SendDataAsync(Uri? uri, IPacket? requ
while (retry-- > 0)
{
// 发出请求
using var rs2 = await SendDataAsync(uri, req).ConfigureAwait(false);
var rs2 = await SendDataAsync(uri, req).ConfigureAwait(false);
if (rs2 == null || rs2.Length == 0) return null;

// 解析响应
Expand Down Expand Up @@ -204,7 +204,7 @@ protected virtual async Task<IOwnerPacket> SendDataAsync(Uri? uri, IPacket? requ
var last = rs;
while (total < res.ContentLength)
{
//todo 这里的IPacket.Append可能有问题,因为last本质上是结构体
// 这里的IPacket.Append可能有问题,因为last不能是结构体
var pk = await SendDataAsync(null, null).ConfigureAwait(false);
last.Append(pk);

Expand Down Expand Up @@ -343,7 +343,7 @@ private IPacket ParseChunk(IPacket rs, out Int32 offset, out Int32 octets)
RequestUri = new Uri(url),
};

var rs = (await SendAsync(request).ConfigureAwait(false));
using var rs = (await SendAsync(request).ConfigureAwait(false));
return rs?.Body?.ToStr();
}
#endregion
Expand All @@ -360,7 +360,7 @@ private IPacket ParseChunk(IPacket rs, out Int32 offset, out Int32 octets)
var baseAddress = BaseAddress ?? throw new ArgumentNullException(nameof(BaseAddress));
var request = BuildRequest(baseAddress, method, action, args);

var rs = await SendAsync(request);
using var rs = await SendAsync(request);

if (rs == null || rs.Body == null || rs.Body.Length == 0) return default;

Expand Down
4 changes: 2 additions & 2 deletions NewLife.Core/Http/WebSocketMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public virtual IOwnerPacket ToPacket()
if (!StatusDescription.IsNullOrEmpty()) len += Encoding.UTF8.GetByteCount(StatusDescription);
}

var rs = new ArrayPacket(1 + 1 + 8 + 4 + len);
var rs = new OwnerPacket(1 + 1 + 8 + 4 + len);
var writer = new SpanWriter(rs.GetSpan())
{
IsLittleEndian = false
Expand Down Expand Up @@ -212,7 +212,7 @@ public virtual IOwnerPacket ToPacket()
if (!StatusDescription.IsNullOrEmpty()) writer.Write(StatusDescription, -1);
}

return (rs.Slice(0, writer.Position) as IOwnerPacket)!;
return rs.Slice(0, writer.Position);
}
#endregion
}
2 changes: 1 addition & 1 deletion NewLife.Core/Net/Handlers/LengthFieldCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected override Object Encode(IHandlerContext context, IPacket msg)
}
else
{
msg = new ArrayPacket(len) { Next = msg };
msg = new OwnerPacket(len) { Next = msg };
}

var writer = new SpanWriter(msg.GetSpan()) { IsLittleEndian = Size > 0 };
Expand Down
14 changes: 13 additions & 1 deletion NewLife.Core/Net/Handlers/MessageCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ public class MessageCodec<T> : Handler
/// <returns></returns>
public override Object? Write(IHandlerContext context, Object message)
{
// 谁申请,谁归还
IOwnerPacket? owner = null;
if (message is T msg)
{
var rs = Encode(context, msg);
if (rs == null) return null;

message = rs;
owner = rs as IOwnerPacket;

// 加入队列,忽略请求消息
if (message is IMessage msg2)
Expand All @@ -62,7 +66,15 @@ public class MessageCodec<T> : Handler
AddToQueue(context, msg);
}

return base.Write(context, message);
try
{
return base.Write(context, message);
}
finally
{
// 下游可能忘了释放内存,这里兜底释放
owner?.Dispose();
}
}

/// <summary>编码消息,一般是编码为Packet后传给下一个处理器</summary>
Expand Down
Loading

0 comments on commit 9e99cc0

Please sign in to comment.