OneKey Prime Sync Component - A Midway.js-based component for data synchronization with MongoDB and Kafka support.
- Midway.js Component Architecture: Built as a reusable Midway.js component
- Data Synchronization: Efficient sync protocol for client-server data synchronization
- MongoDB Integration: Built-in MongoDB adapter for data persistence
- Kafka Support: Message queue integration for event-driven architecture
- Dependency Injection: Clean IoC container with dependency injection
- Type Safety: Full TypeScript implementation with DTOs
- Extensible Design: Adapter pattern for easy integration with different backends
# Install dependencies
yarn install
# Build the component
yarn buildImport the component in your Midway.js application:
// src/configuration.ts
import * as sync from '@onekeyhq/cloud-sync-server';
@Configuration({
imports: [
sync,
// ... other components
],
})
export class MainConfiguration {
// ...
}Configure the component in your application's config file:
// src/config/config.default.ts
export default {
sync: {
enableCache: true,
cachePrefix: 'sync:',
cacheTTL: 300,
adapters: {
mongodb: mongodbAdapter,
kafka: kafkaAdapter,
},
dependenciesProvider: {
updateUser: async (userId: string, data: any) => {
// Your implementation
},
getTraceHeaders: () => ({
// Your trace headers
}),
errors: {
// Your error definitions
},
},
},
};cloud-sync-server/
├── src/
│ ├── configuration.ts # Component configuration
│ ├── service/
│ │ └── sync.service.ts # Main sync service (PrimeSyncService)
│ ├── dto/
│ │ └── sync.dto.ts # Data Transfer Objects
│ ├── interface/
│ │ ├── index.ts # Main interfaces
│ │ ├── adapter.ts # Adapter interfaces
│ │ └── config.ts # Configuration interfaces
│ └── adapter/
│ ├── mongodb.adapter.ts # MongoDB adapter example
│ └── kafka.adapter.ts # Kafka adapter example
├── test/
│ └── sync.test.ts # Unit tests
├── package.json
└── tsconfig.json
The main synchronization service that handles:
- Upload operations for client data
- Download operations for syncing to clients
- Conflict resolution
- Change locking mechanisms
- Flush operations
PrimeSyncUploadRequestDTO: Client upload request structurePrimeSyncDownloadRequestDTO: Client download request structurePrimeSyncCheckRequestDTO: Sync status check requestPrimeChangeLockRequestDTO: Lock management for concurrent updatesPrimeSyncFlushRequestDTO: Flush operation request
Handles data persistence with MongoDB:
- Document storage and retrieval
- Query optimization
- Index management
- Transaction support
Manages event streaming:
- Event publishing
- Topic management
- Consumer group coordination
- Event replay capabilities
Upload sync items from client to server.
const result = await syncService.upload({
items: [
{
id: 'item-1',
data: { /* your data */ },
version: 1,
timestamp: Date.now(),
},
],
clientId: 'client-123',
});Download sync items from server to client.
const result = await syncService.download({
since: lastSyncTimestamp,
clientId: 'client-123',
limit: 100,
});Check synchronization status.
const status = await syncService.check({
clientId: 'client-123',
items: ['item-1', 'item-2'],
});Manage locks for concurrent updates.
await syncService.changeLock({
itemId: 'item-1',
action: 'lock', // or 'unlock'
clientId: 'client-123',
});Flush pending changes.
await syncService.flush({
clientId: 'client-123',
force: true,
});The MongoDB adapter must implement the following methods:
findUserById(userId, fields?)- Find user by IDfindSyncDataByUserId(userId, condition?, fields?)- Find sync datafindSyncDataWithPagination(userId, condition?, fields?, skip?, limit?)- Find sync data with paginationbulkWriteSyncData(operations)- Bulk write sync data operationsdeleteSyncDataByUserId(userId)- Delete all sync data for a userbulkWriteSyncHistory(userId, nonce, records)- Bulk write sync historyfindLockByUserId(userId, fields?)- Find lock by user IDupsertLock(userId, lock, nonce)- Create or update lockdeleteLockByUserId(userId)- Delete lock by user ID
The Kafka adapter must implement:
sendPrimeNotification(data)- Send prime notification to Kafka
# Build TypeScript
yarn build
# Watch mode for development
yarn dev
yarn watch
# Run tests
yarn test
yarn test:watch
yarn test:cov
# Linting
yarn lint
yarn lint:fixThe component includes comprehensive unit tests using Jest:
# Run all tests
yarn test
# Run tests in watch mode
yarn test:watch
# Generate coverage report
yarn test:covTest files are located in the test/ directory.
ESLint is configured with TypeScript support:
# Check code style
yarn lint
# Auto-fix issues
yarn lint:fiximport { Controller, Inject, Post, Body } from '@midwayjs/core';
import { PrimeSyncService } from '@onekeyhq/cloud-sync-server';
@Controller('/api/sync')
export class SyncController {
@Inject()
primeSyncService: PrimeSyncService;
@Post('/upload')
async upload(@Body() body: PrimeSyncUploadRequestDTO) {
return await this.primeSyncService.upload(body);
}
@Post('/download')
async download(@Body() body: PrimeSyncDownloadRequestDTO) {
return await this.primeSyncService.download(body);
}
}import { IMongodbAdapter } from '@onekeyhq/cloud-sync-server';
import { Model } from 'mongoose';
export class MongodbAdapterImpl implements IMongodbAdapter {
constructor(
private userModel: Model<any>,
private deviceModel: Model<any>,
private syncModel: Model<any>,
private syncHistoryModel: Model<any>,
private syncLockModel: Model<any>
) {}
async findUserById(userId: string, fields?: Record<string, number>): Promise<any> {
return this.userModel.findOne({ userId, deleted: false }, fields).lean();
}
async findSyncDataByUserId(userId: string, condition?: any, fields?: Record<string, number>): Promise<any[]> {
const query = { userId, deleted: false, ...condition };
return this.syncModel.find(query, fields).lean();
}
// Implement other methods...
}const dependenciesProvider: ISyncDependenciesProvider = {
updateUser: async (userId: string, data: any) => {
// Update user logic
},
getTraceHeaders: () => ({
'x-request-id': generateRequestId(),
'x-trace-id': generateTraceId(),
}),
errors: {
NotFound: class extends Error {
constructor(message: string) {
super(message);
this.name = 'NotFound';
}
},
// Other error classes
},
};| Option | Type | Default | Description |
|---|---|---|---|
enableCache |
boolean |
false |
Enable caching mechanism |
cachePrefix |
string |
'sync:' |
Cache key prefix |
cacheTTL |
number |
300 |
Cache TTL in seconds |
adapters |
ISyncAdapters |
undefined |
MongoDB and Kafka adapters |
dependenciesProvider |
ISyncDependenciesProvider |
undefined |
External dependencies |
The component uses custom error classes for different scenarios:
NotFound: Resource not foundConflict: Sync conflict detectedValidationError: Invalid input dataLockError: Lock acquisition failedTimeoutError: Operation timeout
- Caching: Enable caching for frequently accessed data
- Batch Operations: Use batch upload/download for better performance
- Indexing: Ensure proper MongoDB indexes for query optimization
- Connection Pooling: Configure appropriate connection pool sizes
- Pagination: Use limit parameter in download operations
- Input validation using DTOs
- Authentication through dependency provider
- Rate limiting should be implemented at application level
- Sanitize all user inputs
- Use HTTPS in production
-
MongoDB Connection Failed
- Check MongoDB connection string
- Verify network connectivity
- Ensure MongoDB is running
-
Kafka Connection Issues
- Verify Kafka broker addresses
- Check topic existence
- Review consumer group configuration
-
Sync Conflicts
- Implement proper conflict resolution strategy
- Use versioning for items
- Consider using locks for critical updates
-
Performance Issues
- Enable caching
- Optimize MongoDB queries
- Use batch operations
- Monitor memory usage
- Update import statements
- Migrate configuration format
- Update adapter implementations
- Review breaking changes in CHANGELOG
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Write tests for your changes
- Ensure all tests pass (
yarn test) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is part of the OneKey ecosystem.
For issues and questions, please open an issue on GitHub or contact the OneKey development team.