Open
Description
Environment Information
- OS Mac:
- Node Version v21.7.2:
- NPM Version 10.5.0]:
- C++ Toolchain VSCode + support pack:
- "@confluentinc/schemaregistry": "^1.1.0",
- "@confluentinc/kafka-javascript": "^1.2.0",
Steps to Reproduce
Use this file:
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS;
const { faker } = require("@faker-js/faker");
require("dotenv").config();
const {
SerdeType,
BearerAuthCredentials,
SchemaRegistryClient, SchemaInfo
} = require("@confluentinc/schemaregistry");
const axios = require('axios');
const JsonSerializer = require("@confluentinc/schemaregistry").JsonSerializer;
const kafkaConfig = {};
kafkaConfig.clientId = "kafka-nodejs-console-sample-producer";
kafkaConfig.brokers = [process.env.BOOTSTRAP_SERVERS || "127.0.0.1:9092"];
console.log(kafkaConfig.brokers);
const isLocal = kafkaConfig.brokers.some(broker =>
broker.includes('localhost') || broker.includes('127.0.0.1')
);
const createAxiosDefaults = () => ({
timeout: 10000,
headers: axios.defaults.headers
});
// Default to SSL/SASL for non-local brokers
// Feel free to remove this if it doesn't fit your needss
if (!isLocal) {
kafkaConfig.ssl = true;
kafkaConfig.sasl = {
mechanism: "PLAIN",
username: process.env.CC_API_KEY || "{{ cc_api_key }}",
password: process.env.CC_API_SECRET || "{{ cc_api_secret }}"
}
}
const kafka = new Kafka({ kafkaJS: kafkaConfig });
const producer = kafka.producer();
const schemaRegistryClient = new SchemaRegistryClient({
baseURLs: ['http://localhost:8081'],
cacheCapacity: 512,
createAxiosDefaults: createAxiosDefaults,
localAuthCredentials: {
credentialsSource: 'USER_INFO',
userInfo: 'RBACAllowedUser-lsrc1:nohash',
}
});
const topic = process.env.KAFKA_TOPIC || "{{ topic }}";
const schemaString = JSON.stringify({
type: 'record',
name: 'DatabaseRecord',
namespace: 'com.example',
fields: [
{ name: 'collation', type: 'string' },
{ name: 'column', type: 'string' },
{ name: 'engine', type: 'string' },
{ name: 'mongodbObjectId', type: 'string' },
{ name: 'type', type: 'string' }
],
});
const schemaInfo = {
schemaType: 'AVRO',
schema: schemaString,
};
try {
const serializer = new JsonSerializer({
schemaRegistryClient: schemaRegistryClient,
subjectNamingStrategy: SerdeType.RECORD_NAME_STRATEGY,
cacheCapacity: 512
});
console.log("Serializer created successfully:", serializer);
} catch (error) {
console.error("Error creating serializer:", error);
}
// producer should always be connected at app initialization, separately from producing message
const connectProducer = async () => {
await producer.connect();
console.log("Connected successfully");
};
const run = async () => {
const MAX_MESSAGES = 10;
try {
for (let i = 0; i < MAX_MESSAGES; i++) {
let messageValue = {
collation: faker.database.collation(),
column: faker.database.column(),
engine: faker.database.engine(),
mongodbObjectId: faker.database.engine(),
type: faker.database.type(),
};
await producer.send({
topic: topic,
messages: [{ value: await serializer.serialize(topic, messageValue, schemaInfo) }],
});
console.log(`Sent message ${i + 1}/${MAX_MESSAGES}: ${JSON.stringify(messageValue)}`);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
console.log('Reached maximum message count');
} catch (err) {
console.error("Error thrown during send:", err);
} finally {
await producer.disconnect();
}
console.log("Disconnected successfully");
};
(async () => {
await connectProducer();
await run();
})();
const shutdown = async () => {
try {
await producer.disconnect();
console.log("Producer disconnected successfully");
process.exit(0);
} catch (error) {
console.error("Error during shutdown:", error);
process.exit(1);
}
};
const signalTraps = ["SIGTERM", "SIGINT", "SIGUSR2"];
process.on("unhandledRejection", async (error) => {
console.error("Unhandled rejection:", error);
await shutdown();
});
process.on("uncaughtException", async (error) => {
console.error("Uncaught exception:", error);
await shutdown();
});
signalTraps.forEach((type) => {
process.once(type, async () => {
console.log(`Received ${type}`);
await shutdown();
});
});
module.exports = { producer };
and you'll need to npm install
from:
{
"dependencies": {
"@confluentinc/schemaregistry": "^1.1.0",
"@faker-js/faker": "^8.4.1",
"axios": "^1.8.1",
"dotenv": "^16.4.5"
},
"devDependencies": {
"@confluentinc/kafka-javascript": "^1.2.0",
"@types/node": "^20.12.7",
"chai": "^4.5.0",
"mocha": "^10.8.2"
},
"scripts": {
"test": "mocha test/integration/**/*.test.js --timeout 30000",
"test:watch": "mocha test/integration/**/*.test.js --timeout 30000 --watch",
"test:docker": "docker-compose -f test/integration/docker-compose.test.yml up --build --abort-on-container-exit --exit-code-from test",
"test:docker:clean": "docker-compose -f test/integration/docker-compose.test.yml down -v",
"test:debug": "DEBUG=kafka* npm run test"
}
}
Then run node filename.js
.
You will get:
❯ node producer.js
[ '127.0.0.1:9092' ]
Error creating serializer: TypeError: Cannot read properties of undefined (reading 'cacheCapacity')
at new JsonSerializer (/Users/lcerchie/vscode-registry/templates/javascript-client/src/node_modules/@confluentinc/schemaregistry/dist/serde/json.js:65:79)
at Object.<anonymous> (/Users/lcerchie/vscode-registry/templates/javascript-client/src/producer.js:70:22)
confluent-kafka-javascript Configuration Settings
None, this is local and should work as configured above.
Additional context