Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4cfb897
Refactored for trunk
matt-welch Sep 16, 2024
125a246
Refactor for readability
matt-welch Sep 18, 2024
6dcaed8
:bugfix: Remove redundant declaration of overallStats
matt-welch Sep 20, 2024
46fcad9
Remove overallStats
matt-welch Sep 20, 2024
df6a119
Merge branch 'trunk' into KIP-1052-test-PR
matt-welch Sep 30, 2024
6dfbaa7
:bugfix: Properly create callback during switch
matt-welch Oct 3, 2024
74f2f1c
Merge branch 'trunk' into KIP-1052-PR
matt-welch Oct 3, 2024
baef288
Fix redundant numRecords declaration
matt-welch Oct 3, 2024
50a7565
Fix erroneous extra bracket
matt-welch Oct 3, 2024
4dd891f
Fix ternary operator in printTotal
matt-welch Oct 4, 2024
ce9ce6b
Fix selection in ternary operator for proper label
matt-welch Oct 4, 2024
9be5385
Fix concatenating stats constructor
matt-welch Oct 17, 2024
fb8a4ca
Fix checkstyle error and print
matt-welch Oct 21, 2024
440329b
Merge branch 'trunk' into KIP-1052-PR
matt-welch Oct 21, 2024
a57554b
Refactor for redundant stats objects
matt-welch Dec 11, 2024
818d1c6
Merge branch 'trunk' of github.com:apache/kafka into trunk
Jan 9, 2025
b0a6c20
Fix double-printing of windows during steady state
Jan 16, 2025
b127087
Remove unused constructor, fix types
matt-welch Jan 16, 2025
a55a429
Merge branch 'trunk' into KIP-1052-PR
matt-welch Mar 3, 2025
4ff4daa
Refactor inner callback conditional
matt-welch Mar 24, 2025
7def5f4
Move DEFAULT_REPORTING_INTERVAL_MS into Stats
matt-welch Apr 18, 2025
2026e14
Update tools/src/main/java/org/apache/kafka/tools/ProducerPerformance…
visz11 Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 62 additions & 11 deletions tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ void start(String[] args) throws IOException {
// not thread-safe, do not share with other threads
SplittableRandom random = new SplittableRandom(0);
ProducerRecord<byte[], byte[]> record;
stats = new Stats(config.numRecords, 5000);

if (config.warmupRecords > 0) {
System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary.");
}
stats = new Stats(config.numRecords);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The stats object is initialized with config.numRecords, which includes the warmup records. Consider initializing it with config.numRecords - config.warmupRecords to represent only the steady-state records. This might provide a clearer representation of the steady-state statistics.

Suggested change
stats = new Stats(config.numRecords);
stats = new Stats(config.numRecords - config.warmupRecords);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stats Constructor Mismatch

The main Stats object is initialized with just numRecords, using the new constructor that defaults isSteadyState to false. However, the steadyStateActive field is later set to true on this object, creating an inconsistent state where the object is not initialized as steady state but is marked as active.

Standards
  • Logic-Verification-State-Consistency
  • Business-Rule-Object-Initialization

long startMs = System.currentTimeMillis();

ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs);
Expand All @@ -94,7 +98,11 @@ void start(String[] args) throws IOException {
record = new ProducerRecord<>(config.topicName, payload);

long sendStartMs = System.currentTimeMillis();
cb = new PerfCallback(sendStartMs, payload.length, stats);
if ( config.warmupRecords > 0 && i == config.warmupRecords ) {
steadyStateStats = new Stats(config.numRecords - config.warmupRecords, config.warmupRecords > 0);
stats.steadyStateActive = true;
Comment on lines +102 to +103
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant Stats Initialization

The code initializes steadyStateStats with a boolean parameter derived from config.warmupRecords > 0, which is always true at this point. This redundant check creates confusion about the initialization state and could lead to inconsistent behavior.

Standards
  • Clean-Code-Logical-Clarity
  • Algorithm-Correctness-Initialization

}
cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);
Comment on lines +101 to +105
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException Risk

The steadyStateStats variable is passed to PerfCallback constructor even when it's null (when warmupRecords is 0 or i hasn't reached warmupRecords yet). This will cause NullPointerException in PerfCallback.onCompletion() when it tries to access steadyStateStats methods.

                if (config.warmupRecords > 0 && i == config.warmupRecords) {
                    steadyStateStats = new Stats(config.numRecords - config.warmupRecords, true);
                    stats.steadyStateActive = true;
                }
                Stats callbackSteadyStateStats = config.warmupRecords > 0 ? steadyStateStats : null;
                cb = new PerfCallback(sendStartMs, payload.length, stats, callbackSteadyStateStats);
Commitable Suggestion
Suggested change
if ( config.warmupRecords > 0 && i == config.warmupRecords ) {
steadyStateStats = new Stats(config.numRecords - config.warmupRecords, config.warmupRecords > 0);
stats.steadyStateActive = true;
}
cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);
if (config.warmupRecords > 0 && i == config.warmupRecords) {
steadyStateStats = new Stats(config.numRecords - config.warmupRecords, true);
stats.steadyStateActive = true;
}
Stats callbackSteadyStateStats = config.warmupRecords > 0 ? steadyStateStats : null;
cb = new PerfCallback(sendStartMs, payload.length, stats, callbackSteadyStateStats);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

Comment on lines +101 to +105
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent null handling for steadyStateStats

The steadyStateStats variable is passed to PerfCallback constructor before it's initialized when warmupRecords is 0 or before reaching the warmup threshold. This creates a risk of NullPointerException when the callback tries to access steadyStateStats methods.

Standards
  • Clean-Code-Defensive-Programming
  • SOLID-SRP
  • Refactoring-Null-Safety

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callback NPE Risk

The steadyStateStats variable is passed to PerfCallback constructor even when null (when warmupRecords is 0 or i hasn't reached warmupRecords). This creates a null pointer exception risk in PerfCallback.onCompletion() when accessing steadyStateStats methods.

Standards
  • CWE-476
  • OWASP-A06

producer.send(record, cb);

currentTransactionSize++;
Expand All @@ -116,6 +124,10 @@ record = new ProducerRecord<>(config.topicName, payload);

/* print final results */
stats.printTotal();
/* print steady-state stats if relevant */
if (steadyStateStats != null) {
steadyStateStats.printTotal();
}
} else {
// Make sure all messages are sent before printing out the stats and the metrics
// We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
Comment on lines 125 to 134
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate Code

Identical code blocks for printing stats appear in both branches of the conditional. This violates the DRY principle and creates maintenance burden when changes to the stats printing logic are needed, requiring updates in multiple places.

Standards
  • Clean-Code-DRY
  • Refactoring-Extract-Method
  • Maintainability-Quality-Duplication

Expand All @@ -124,6 +136,10 @@ record = new ProducerRecord<>(config.topicName, payload);

/* print final results */
stats.printTotal();
/* print steady-state stats if relevant */
if (steadyStateStats != null) {
steadyStateStats.printTotal();
}

/* print out metrics */
ToolsUtils.printMetrics(producer.metrics());
Expand All @@ -146,8 +162,8 @@ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
}

Callback cb;

Stats stats;
Stats steadyStateStats;
Comment on lines 166 to +167
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Variable Declaration

The steadyStateStats variable is declared but not initialized, while stats is similarly declared. This inconsistency in variable initialization pattern could lead to confusion and potential null pointer exceptions if not properly managed throughout the code.

Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness


static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
SplittableRandom random, boolean payloadMonotonic, long recordValue) {
Expand All @@ -163,7 +179,7 @@ static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByte
}
return payload;
}

static Properties readProps(List<String> producerProps, String producerConfig) throws IOException {
Properties props = new Properties();
if (producerConfig != null) {
Expand Down Expand Up @@ -326,6 +342,16 @@ static ArgumentParser argParser() {
"--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " +
"the default value will be 3000.");

parser.addArgument("--warmup-records")
.action(store())
.required(false)
.type(Long.class)
.metavar("WARMUP-RECORDS")
.dest("warmupRecords")
.setDefault(0L)
.help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " +
"An additional summary line will be printed describing the steady-state statistics. (default: 0).");

return parser;
}

Expand All @@ -346,8 +372,14 @@ static class Stats {
private long windowTotalLatency;
private long windowBytes;
private long windowStart;
private final boolean isSteadyState;
private boolean steadyStateActive;

public Stats(long numRecords) {
this(numRecords, false);
}
Comment on lines +379 to +381
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant Parameter Check

The Stats constructor with one parameter calls the two-parameter constructor with false for isSteadyState, but later in the code, steadyStateActive is set based on config.warmupRecords > 0. This inconsistency in initialization could lead to incorrect behavior in steady state reporting.

Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Maturity


public Stats(long numRecords, int reportingInterval) {
public Stats(long numRecords, boolean isSteadyState) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter Name Change

The constructor parameter was changed from 'reportingInterval' to 'isSteadyState' with different types (int to boolean), but the parameter is still used as reportingInterval in the constructor body. This creates a significant maintainability issue as the parameter name no longer reflects its usage.

        public Stats(long numRecords, boolean isSteadyState) {
            this.start = System.currentTimeMillis();
            this.windowStart = System.currentTimeMillis();
            this.iteration = 0;
            this.count = 0;
            this.bytes = 0;
            this.maxLatency = 0;
            this.totalLatency = 0;
            this.windowCount = 0;
            this.windowMaxLatency = 0;
            this.windowTotalLatency = 0;
            this.windowBytes = 0;
            this.totalLatency = 0;
            this.reportingInterval = 5000;
            this.isSteadyState = isSteadyState;
            this.steadyStateActive = isSteadyState;
        }
Commitable Suggestion
Suggested change
public Stats(long numRecords, boolean isSteadyState) {
public Stats(long numRecords, boolean isSteadyState) {
this.start = System.currentTimeMillis();
this.windowStart = System.currentTimeMillis();
this.iteration = 0;
this.count = 0;
this.bytes = 0;
this.maxLatency = 0;
this.totalLatency = 0;
this.windowCount = 0;
this.windowMaxLatency = 0;
this.windowTotalLatency = 0;
this.windowBytes = 0;
this.totalLatency = 0;
this.reportingInterval = 5000;
this.isSteadyState = isSteadyState;
this.steadyStateActive = isSteadyState;
}
Standards
  • Clean-Code-Meaningful-Names
  • SOLID-SRP
  • Maintainability-Quality-Interfaces

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent parameter naming in Stats constructor

The constructor parameter was changed from 'reportingInterval' to 'isSteadyState' with different types, but the parameter name no longer reflects its usage. This creates confusion as the parameter name suggests one purpose while the implementation uses it differently.

Standards
  • Clean-Code-Meaningful-Names
  • SOLID-SRP
  • Maintainability-Quality-Interfaces

this.start = System.currentTimeMillis();
this.windowStart = System.currentTimeMillis();
this.iteration = 0;
Expand All @@ -360,7 +392,9 @@ public Stats(long numRecords, int reportingInterval) {
this.windowTotalLatency = 0;
this.windowBytes = 0;
this.totalLatency = 0;
this.reportingInterval = reportingInterval;
this.reportingInterval = 5000;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded Value

The reporting interval is now hardcoded to 5000 instead of being passed as a parameter. This reduces flexibility and makes the code less configurable. The previous parameterized approach was more maintainable as it allowed for different reporting intervals without code changes.

Standards
  • Clean-Code-Configuration
  • Maintainability-Quality-Flexibility

this.isSteadyState = isSteadyState;
this.steadyStateActive = isSteadyState;
}

public void record(int latency, int bytes, long time) {
Expand All @@ -378,7 +412,12 @@ public void record(int latency, int bytes, long time) {
}
/* maybe report the recent perf */
if (time - windowStart >= reportingInterval) {
printWindow();
if (this.isSteadyState && count == windowCount) {
System.out.println("Beginning steady state.");
}
if (this.isSteadyState || !this.steadyStateActive) {
printWindow();
}
newWindow();
}
}
Expand Down Expand Up @@ -428,8 +467,9 @@ public void printTotal() {
double recsPerSec = 1000.0 * count / (double) elapsed;
double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
count,
this.isSteadyState ? " steady state" : "",
recsPerSec,
mbPerSec,
totalLatency / (double) count,
Expand All @@ -456,10 +496,12 @@ static final class PerfCallback implements Callback {
private final long start;
private final int bytes;
private final Stats stats;
private final Stats steadyStateStats;

public PerfCallback(long start, int bytes, Stats stats) {
public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) {
this.start = start;
this.stats = stats;
this.steadyStateStats = steadyStateStats;
this.bytes = bytes;
}

Expand All @@ -471,6 +513,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
this.stats.record(latency, bytes, now);
this.stats.iteration++;
if (steadyStateStats != null) {
this.steadyStateStats.record(latency, bytes, now);
this.steadyStateStats.iteration++;
Comment on lines +517 to +519

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The steadyStateStats object records all latencies, bytes, and iterations, even during the warmup phase. This could skew the steady-state statistics. Consider only recording these metrics after the warmup phase is complete, by adding a check to ensure that i >= config.warmupRecords.

Suggested change
if (steadyStateStats != null) {
this.steadyStateStats.record(latency, bytes, now);
this.steadyStateStats.iteration++;
if (steadyStateStats != null && i >= config.warmupRecords) {
this.steadyStateStats.record(latency, bytes, now);
this.steadyStateStats.iteration++;
}

}
Comment on lines +517 to +520
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Initialization Check

The code checks if steadyStateStats is null before accessing it, but then uses this.steadyStateStats which might still be null if the field wasn't initialized. This inconsistent null check pattern could lead to NullPointerException.

                if (steadyStateStats != null) {
                    steadyStateStats.record(latency, bytes, now);
                    steadyStateStats.iteration++;
                }
Commitable Suggestion
Suggested change
if (steadyStateStats != null) {
this.steadyStateStats.record(latency, bytes, now);
this.steadyStateStats.iteration++;
}
if (steadyStateStats != null) {
steadyStateStats.record(latency, bytes, now);
steadyStateStats.iteration++;
}
Standards
  • Algorithm-Correctness-Null-Safety
  • Logic-Verification-Defensive-Programming

Comment on lines +517 to +520
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NullPointerException in PerfCallback.onCompletion

The code checks if steadyStateStats is null before accessing it, but then uses this.steadyStateStats which could still be null. This inconsistent null check pattern could lead to NullPointerException when steadyStateStats is null.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Handling
  • DbC-Resource-Mgmt

Comment on lines +517 to +520
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Steady State Recording Logic

The code records all messages in steadyStateStats regardless of whether they're warmup or steady state records. This defeats the purpose of separating warmup from steady state measurements and will skew the steady state statistics.

Standards
  • Algorithm-Correctness-Data-Collection
  • Business-Rule-Measurement-Accuracy

}
if (exception != null)
exception.printStackTrace();
Expand All @@ -479,7 +525,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {

static final class ConfigPostProcessor {
final String topicName;
final Long numRecords;
final long numRecords;
final long warmupRecords;
final Integer recordSize;
final double throughput;
final boolean payloadMonotonic;
Expand All @@ -493,6 +540,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
Namespace namespace = parser.parseArgs(args);
this.topicName = namespace.getString("topic");
this.numRecords = namespace.getLong("numRecords");
this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0);
this.recordSize = namespace.getInt("recordSize");
this.throughput = namespace.getDouble("throughput");
this.payloadMonotonic = namespace.getBoolean("payloadMonotonic");
Expand All @@ -503,9 +551,12 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
String payloadFilePath = namespace.getString("payloadFile");
Long transactionDurationMsArg = namespace.getLong("transactionDurationMs");
String transactionIdArg = namespace.getString("transactionalId");
if (numRecords != null && numRecords <= 0) {
if (numRecords <= 0) {
throw new ArgumentParserException("--num-records should be greater than zero", parser);
}
if (warmupRecords >= numRecords) {
throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
}
Comment on lines +558 to +560

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check prevents warmupRecords from being greater than or equal to numRecords. Consider adding a check to prevent the user from setting a negative value for warmupRecords.

Suggested change
if (warmupRecords >= numRecords) {
throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
}
if (warmupRecords < 0) {
throw new ArgumentParserException("The value for --warmup-records must be non-negative.", parser);
}
if (warmupRecords >= numRecords) {

Comment on lines +558 to +560
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warmup Validation Flaw

The validation allows negative warmup records to pass through since they're less than numRecords. While line 543 sets a default of 0 and the code later uses Math.max(warmupRecords, 0), this validation should explicitly check for negative values to prevent confusing behavior.

            if (warmupRecords < 0) {
                throw new ArgumentParserException("--warmup-records should be greater than or equal to zero", parser);
            }
            if (warmupRecords >= numRecords) {
                throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
            }
Commitable Suggestion
Suggested change
if (warmupRecords >= numRecords) {
throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
}
if (warmupRecords < 0) {
throw new ArgumentParserException("--warmup-records should be greater than or equal to zero", parser);
}
if (warmupRecords >= numRecords) {
throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
}
Standards
  • Business-Rule-Input-Validation
  • Logic-Verification-Boundary-Conditions

Comment on lines +558 to +560
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative Warmup Records Validation Missing

The validation allows negative warmup records to pass through since they're less than numRecords. While line 544 sets a default of 0 and Math.max is used later, explicit validation would prevent confusing behavior.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Input-Validation
  • DbC-Precondition

if (recordSize != null && recordSize <= 0) {
throw new ArgumentParserException("--record-size should be greater than zero", parser);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,75 @@ public void testEnableTransactionByTransactionDurationMs() throws IOException, A
assertTrue(configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString()
.startsWith(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX));
}

@Test
public void testWarmupRecordsFractionalValue() throws Exception {
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "1.5",
"--throughput", "100",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser = ProducerPerformance.argParser();
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
}

@Test
public void testWarmupRecordsString() throws Exception {
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "foo",
"--throughput", "100",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser = ProducerPerformance.argParser();
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
}

@Test
public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, null);
return null;
}).when(producerMock).send(any(), any());

String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "2",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);

verify(producerMock, times(10)).send(any(), any());
assertEquals(10, producerPerformanceSpy.stats.totalCount());
assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount());
verify(producerMock, times(1)).close();
}

@Test
public void testWarmupNegativeRecordsNormalTest() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, null);
return null;
}).when(producerMock).send(any(), any());

String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "-1",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);

verify(producerMock, times(10)).send(any(), any());
assertEquals(10, producerPerformanceSpy.stats.totalCount());
verify(producerMock, times(1)).close();
}
}
Loading