Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool commands #1590

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 97 additions & 27 deletions src/BenchmarksApps/Kestrel/PlatformBenchmarks/Data/RawDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

using Microsoft.Extensions.Caching.Memory;

using Npgsql;

namespace PlatformBenchmarks
{
public class RawDb
{
private static readonly ConcurrentQueue<SqlReadCommand> _sqlReadCommands = new ConcurrentQueue<SqlReadCommand>();
private static readonly ConcurrentQueue<SqlFortuneCommand> _sqlFortuneCommands = new ConcurrentQueue<SqlFortuneCommand>();
private static readonly object[] _cacheKeys = Enumerable.Range(0, 10001).Select((i) => new CacheKey(i)).ToArray();
private readonly ConcurrentRandom _random;
private readonly string _connectionString;
private readonly MemoryCache _cache = new MemoryCache(
Expand All @@ -33,9 +39,10 @@ public async Task<World> LoadSingleQueryRow()
{
await db.OpenAsync();

var (cmd, _) = CreateReadCommand(db);
using (cmd)
using (var cmd = CreateReadCommand())
{
cmd.Connection = db;
cmd.Parameter.TypedValue = _random.Next(0, 10000) + 1;
return await ReadSingleRow(cmd);
}
}
Expand All @@ -49,13 +56,14 @@ public async Task<World[]> LoadMultipleQueriesRows(int count)
{
await db.OpenAsync();

var (cmd, idParameter) = CreateReadCommand(db);
using (cmd)
using (var cmd = CreateReadCommand())
{
cmd.Connection = db;
var param = cmd.Parameter;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In npgsql/npgsql#3200 (comment) you say

The typical scenario of reexecuting the same command with the same SQL also does it on the same connection, in which case explicit preparation is the right choice and bypasses everything.

Since this is executed 20 times just changing the value, should it do a prepare here?

Copy link
Member

@roji roji Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should definitely give it a try... I don't pay enough attention as I should to the non-Fortunes benchmarks.

Automatic preparation still has the advantage of doing one less roundtrip - the first execution prepares and executes in the same go, where with explicit preparation they're split. But as you add more executions for a single initial prepare the impact of that goes down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ye olde score multiplier for composite scores
image

Automatic preparation still has the advantage of doing one less roundtrip - the first execution prepared and executes in the same go, where with explicit preparation they're split. But as you add more executions for a single initial prepare the impact of that goes down.

Which is kinda why I want to bypass the parse; then its auto prep + parse once, rather than parse 20 times with auto prep?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll try to take some time this weekend to play around with bypassing the parse. In any case I need to run (and update) the benchmarks for the newest Npgsql 5.0.0 (just released preview1) - will look into the parsing thing as part of that.

for (int i = 0; i < result.Length; i++)
{
param.TypedValue = _random.Next(0, 10000) + 1;
result[i] = await ReadSingleRow(cmd);
idParameter.TypedValue = _random.Next(1, 10001);
}
}
}
Expand Down Expand Up @@ -93,26 +101,27 @@ static async Task<World[]> LoadUncachedQueries(int id, int i, int count, RawDb r
{
await db.OpenAsync();

var (cmd, idParameter) = rawdb.CreateReadCommand(db);
using (cmd)
using (var cmd = CreateReadCommand())
{
Func<ICacheEntry, Task<CachedWorld>> create = async (entry) =>
cmd.Connection = db;
Func<ICacheEntry, Task<CachedWorld>> create = async (entry) =>
{
return await rawdb.ReadSingleRow(cmd);
};

var cacheKeys = _cacheKeys;
var key = cacheKeys[id];

idParameter.TypedValue = id;
var param = cmd.Parameter;
param.TypedValue = id;

for (; i < result.Length; i++)
{
var data = await rawdb._cache.GetOrCreateAsync<CachedWorld>(key, create);
result[i] = data;

id = rawdb._random.Next(1, 10001);
idParameter.TypedValue = id;
param.TypedValue = id;
key = cacheKeys[id];
}
}
Expand All @@ -128,14 +137,14 @@ public async Task PopulateCache()
{
await db.OpenAsync();

var (cmd, idParameter) = CreateReadCommand(db);
using (cmd)
using (var cmd = CreateReadCommand())
{
cmd.Connection = db;
var cacheKeys = _cacheKeys;
var cache = _cache;
for (var i = 1; i < 10001; i++)
{
idParameter.TypedValue = i;
cmd.Parameter.TypedValue = i;
cache.Set<CachedWorld>(cacheKeys[i], await ReadSingleRow(cmd));
}
}
Expand All @@ -152,13 +161,14 @@ public async Task<World[]> LoadMultipleUpdatesRows(int count)
{
await db.OpenAsync();

var (queryCmd, queryParameter) = CreateReadCommand(db);
using (queryCmd)
using (var cmd = CreateReadCommand())
{
cmd.Connection = db;
var queryParameter = cmd.Parameter;
for (int i = 0; i < results.Length; i++)
{
results[i] = await ReadSingleRow(queryCmd);
queryParameter.TypedValue = _random.Next(1, 10001);
queryParameter.TypedValue = _random.Next(0, 10000) + 1;
results[i] = await ReadSingleRow(cmd);
}
}

Expand Down Expand Up @@ -192,34 +202,46 @@ public async Task<List<Fortune>> LoadFortunesRows()
{
await db.OpenAsync();

using (var cmd = new NpgsqlCommand("SELECT id, message FROM fortune", db))
using var cmd = CreateFortuneCommand();
cmd.Connection = db;

using (var rdr = await cmd.ExecuteReaderAsync())
{
while (await rdr.ReadAsync())
{
result.Add(new Fortune
(
id:rdr.GetInt32(0),
id: rdr.GetInt32(0),
message: rdr.GetString(1)
));
}
}
}

result.Add(new Fortune(id: 0, message: "Additional fortune added at request time." ));
result.Add(new Fortune(id: 0, message: "Additional fortune added at request time."));
result.Sort();

return result;
}

private (NpgsqlCommand readCmd, NpgsqlParameter<int> idParameter) CreateReadCommand(NpgsqlConnection connection)
private static SqlReadCommand CreateReadCommand()
{
var cmd = new NpgsqlCommand("SELECT id, randomnumber FROM world WHERE id = @Id", connection);
var parameter = new NpgsqlParameter<int>(parameterName: "@Id", value: _random.Next(1, 10001));
if (!_sqlReadCommands.TryDequeue(out var cmd))
{
cmd = new SqlReadCommand();
}

cmd.Parameters.Add(parameter);
return cmd;
}

return (cmd, parameter);
private static SqlFortuneCommand CreateFortuneCommand()
{
if (!_sqlFortuneCommands.TryDequeue(out var cmd))
{
cmd = new SqlFortuneCommand();
}

return cmd;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -237,7 +259,55 @@ private async Task<World> ReadSingleRow(NpgsqlCommand cmd)
}
}

private static readonly object[] _cacheKeys = Enumerable.Range(0, 10001).Select((i) => new CacheKey(i)).ToArray();
internal class SqlFortuneCommand : IDisposable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these wrapper classes actually necessary, why not pool NpgsqlCommand directly? As far as I can tell they're mainly there to enqueue back when disposing, but that can just be done by the code using the command instead of disposing, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly so the callsites are just a using block and doon't have to worry about pooling e.g. its

using (var cmd = CreateReadCommand())
{
    cmd.Connection = db;
    // do something with pooled command
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Moving the pooling logic here and removing the wrappers might make a tiny bit of difference too.

{
private readonly NpgsqlCommand _cmd;

public NpgsqlConnection Connection
{
set { _cmd.Connection = value; }
}

public SqlFortuneCommand()
{
_cmd = new NpgsqlCommand("SELECT id, message FROM fortune");
}

public Task<NpgsqlDataReader> ExecuteReaderAsync() => _cmd.ExecuteReaderAsync();

public void Dispose()
{
_cmd.Connection = null;
_sqlFortuneCommands.Enqueue(this);
}
}

internal class SqlReadCommand : IDisposable
{
private readonly NpgsqlCommand _cmd;
private readonly NpgsqlParameter<int> _parameter;

public NpgsqlParameter<int> Parameter => _parameter;
public NpgsqlConnection Connection
{
set { _cmd.Connection = value; }
}

public SqlReadCommand()
{
_cmd = new NpgsqlCommand("SELECT id, randomnumber FROM world WHERE id = @Id");
_parameter = new NpgsqlParameter<int>(parameterName: "@Id", value: 0);
_cmd.Parameters.Add(_parameter);
}

public static implicit operator NpgsqlCommand(SqlReadCommand c) => c._cmd;

public void Dispose()
{
_cmd.Connection = null;
_sqlReadCommands.Enqueue(this);
}
}

public sealed class CacheKey : IEquatable<CacheKey>
{
Expand All @@ -249,7 +319,7 @@ public CacheKey(int value)
public bool Equals(CacheKey key)
=> key._value == _value;

public override bool Equals(object obj)
public override bool Equals(object obj)
=> ReferenceEquals(obj, this);

public override int GetHashCode()
Expand Down