Advanced 13 min read
Batch Processing at Scale
Process large volumes of content efficiently with batch AI operations and parallel processing.
Batch Processing at Scale
Learn how to efficiently process large volumes of content with AI batch operations.
When to Use Batch Processing
Batch processing is ideal for:
- Processing 100+ items at once
- Scheduled or overnight jobs
- Data migration and enrichment
- Catalog updates
- Bulk content generation
- Background processing
Batch vs Real-Time
| Aspect | Batch | Real-Time |
|---|---|---|
| Latency tolerance | High (minutes-hours) | Low (seconds) |
| Volume | High (1000s) | Low (1-10) |
| Cost efficiency | Higher | Lower |
| Error handling | Aggregate | Individual |
| Use case | Background jobs | User-facing |
Batch Processing Architecture
Overview
Input Data → Chunking → Parallel Processing → Results Aggregation → Output
│ │ │ │ │
│ │ │ │ │
Upload Split into Run multiple Collect and Store or
or stream manageable workers in merge results deliver
batches parallel
Key Components
- Input Handler - Accept and validate input data
- Chunker - Split into processable batches
- Worker Pool - Parallel AI processing
- Result Aggregator - Collect and merge outputs
- Output Handler - Store or deliver results
Implementation
Basic Batch Processor
class BatchProcessor {
constructor(options = {}) {
this.concurrency = options.concurrency || 10;
this.retryAttempts = options.retryAttempts || 3;
this.rateLimitPerMinute = options.rateLimitPerMinute || 100;
}
async process(items, fabricId) {
const results = [];
const errors = [];
// Process in chunks
for (let i = 0; i < items.length; i += this.concurrency) {
const chunk = items.slice(i, i + this.concurrency);
const chunkResults = await Promise.allSettled(
chunk.map(item => this.processItem(item, fabricId))
);
// Collect results and errors
chunkResults.forEach((result, index) => {
if (result.status === 'fulfilled') {
results.push({
input: chunk[index],
output: result.value,
success: true
});
} else {
errors.push({
input: chunk[index],
error: result.reason.message,
success: false
});
}
});
// Rate limiting
await this.rateLimit();
}
return { results, errors, total: items.length };
}
async processItem(item, fabricId) {
for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
try {
return await callPromptha('/fabrics/run', 'POST', {
fabricId,
inputs: item
});
} catch (error) {
if (attempt === this.retryAttempts) throw error;
await sleep(Math.pow(2, attempt) * 1000);
}
}
}
async rateLimit() {
const msPerRequest = (60 * 1000) / this.rateLimitPerMinute;
await sleep(msPerRequest * this.concurrency);
}
}
// Usage
const processor = new BatchProcessor({ concurrency: 10 });
const { results, errors } = await processor.process(products, 'product-description');
Job Queue Implementation
For production workloads, use a job queue:
const Queue = require('bull');
const batchQueue = new Queue('batch-processing', {
redis: process.env.REDIS_URL
});
// Producer: Add batch job
async function startBatchJob(items, fabricId, options = {}) {
const job = await batchQueue.add('process-batch', {
items,
fabricId,
options
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
}
});
return job.id;
}
// Consumer: Process batch
batchQueue.process('process-batch', async (job) => {
const { items, fabricId, options } = job.data;
const processor = new BatchProcessor(options);
// Update progress
let processed = 0;
for (const chunk of chunks(items, options.concurrency)) {
await processor.processChunk(chunk, fabricId);
processed += chunk.length;
await job.progress(Math.round((processed / items.length) * 100));
}
return { processed, total: items.length };
});
// Monitor progress
batchQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id}: ${progress}% complete`);
});
Distributed Processing
For very large batches, distribute across workers:
class DistributedBatchProcessor {
constructor(workers) {
this.workers = workers; // Array of worker URLs
}
async process(items, fabricId) {
// Distribute items across workers
const itemsPerWorker = Math.ceil(items.length / this.workers.length);
const assignments = this.workers.map((worker, i) => ({
worker,
items: items.slice(i * itemsPerWorker, (i + 1) * itemsPerWorker)
}));
// Start all workers
const workerPromises = assignments.map(({ worker, items }) =>
this.runWorker(worker, items, fabricId)
);
// Aggregate results
const workerResults = await Promise.all(workerPromises);
return this.aggregateResults(workerResults);
}
async runWorker(workerUrl, items, fabricId) {
const response = await fetch(`${workerUrl}/process`, {
method: 'POST',
body: JSON.stringify({ items, fabricId })
});
return response.json();
}
aggregateResults(workerResults) {
return {
results: workerResults.flatMap(r => r.results),
errors: workerResults.flatMap(r => r.errors),
total: workerResults.reduce((sum, r) => sum + r.total, 0)
};
}
}
File-Based Batch Processing
CSV Input Processing
const csv = require('csv-parser');
const fs = require('fs');
async function processCsvBatch(inputPath, fabricId, outputPath) {
const items = [];
// Read CSV
await new Promise((resolve, reject) => {
fs.createReadStream(inputPath)
.pipe(csv())
.on('data', (row) => items.push(row))
.on('end', resolve)
.on('error', reject);
});
console.log(`Processing ${items.length} items...`);
// Process batch
const processor = new BatchProcessor();
const { results, errors } = await processor.process(items, fabricId);
// Write results
const outputData = results.map(r => ({
...r.input,
ai_output: r.output.result
}));
await writeCsv(outputPath, outputData);
console.log(`Complete: ${results.length} success, ${errors.length} errors`);
return { results, errors };
}
Streaming Large Files
For files too large to fit in memory:
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
class BatchTransform extends Transform {
constructor(fabricId, batchSize = 100) {
super({ objectMode: true });
this.fabricId = fabricId;
this.batchSize = batchSize;
this.buffer = [];
}
async _transform(item, encoding, callback) {
this.buffer.push(item);
if (this.buffer.length >= this.batchSize) {
const results = await this.processBatch(this.buffer);
this.buffer = [];
results.forEach(r => this.push(r));
}
callback();
}
async _flush(callback) {
if (this.buffer.length > 0) {
const results = await this.processBatch(this.buffer);
results.forEach(r => this.push(r));
}
callback();
}
async processBatch(items) {
// Process with AI
const results = await Promise.all(
items.map(item => callPromptha('/fabrics/run', 'POST', {
fabricId: this.fabricId,
inputs: item
}))
);
return results;
}
}
// Usage
await pipeline(
fs.createReadStream('input.csv').pipe(csv()),
new BatchTransform('product-description', 50),
stringify(),
fs.createWriteStream('output.csv')
);
Error Handling and Recovery
Checkpoint System
Save progress for recovery:
class CheckpointedBatchProcessor {
constructor(checkpointPath) {
this.checkpointPath = checkpointPath;
}
async process(items, fabricId) {
// Load checkpoint if exists
const checkpoint = await this.loadCheckpoint();
const startIndex = checkpoint?.lastProcessed || 0;
const results = checkpoint?.results || [];
const errors = checkpoint?.errors || [];
for (let i = startIndex; i < items.length; i++) {
try {
const result = await this.processItem(items[i], fabricId);
results.push({ index: i, input: items[i], output: result });
} catch (error) {
errors.push({ index: i, input: items[i], error: error.message });
}
// Save checkpoint every 10 items
if ((i + 1) % 10 === 0) {
await this.saveCheckpoint({
lastProcessed: i + 1,
results,
errors
});
}
}
// Clear checkpoint on completion
await this.clearCheckpoint();
return { results, errors };
}
async loadCheckpoint() {
try {
const data = await fs.readFile(this.checkpointPath, 'utf8');
return JSON.parse(data);
} catch {
return null;
}
}
async saveCheckpoint(data) {
await fs.writeFile(this.checkpointPath, JSON.stringify(data));
}
async clearCheckpoint() {
await fs.unlink(this.checkpointPath).catch(() => {});
}
}
Dead Letter Queue
Handle persistent failures:
const deadLetterQueue = new Queue('dead-letters');
async function processWithDLQ(item, fabricId, maxAttempts = 3) {
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await callPromptha('/fabrics/run', 'POST', {
fabricId,
inputs: item
});
} catch (error) {
if (attempt === maxAttempts) {
// Send to dead letter queue for manual review
await deadLetterQueue.add('failed-item', {
item,
fabricId,
error: error.message,
attempts: maxAttempts,
failedAt: new Date()
});
throw error;
}
await sleep(Math.pow(2, attempt) * 1000);
}
}
}
// Process dead letters manually or with retry
deadLetterQueue.process('failed-item', async (job) => {
console.log('Manual review needed:', job.data);
// Implement manual review workflow
});
Monitoring and Observability
Progress Tracking
class MonitoredBatchProcessor extends BatchProcessor {
constructor(options) {
super(options);
this.metrics = {
started: 0,
completed: 0,
failed: 0,
totalLatency: 0
};
}
async processItem(item, fabricId) {
this.metrics.started++;
const start = Date.now();
try {
const result = await super.processItem(item, fabricId);
this.metrics.completed++;
this.metrics.totalLatency += Date.now() - start;
return result;
} catch (error) {
this.metrics.failed++;
throw error;
}
}
getStats() {
return {
...this.metrics,
successRate: this.metrics.completed / this.metrics.started,
avgLatency: this.metrics.totalLatency / this.metrics.completed
};
}
}
Alerting
async function monitorBatch(jobId, alertThresholds) {
const job = await batchQueue.getJob(jobId);
// Alert on high error rate
const errorRate = job.data.errors / job.data.total;
if (errorRate > alertThresholds.maxErrorRate) {
await sendAlert('High error rate in batch job', {
jobId,
errorRate,
threshold: alertThresholds.maxErrorRate
});
}
// Alert on slow processing
const elapsed = Date.now() - job.processedOn;
const expectedDuration = job.data.total * alertThresholds.msPerItem;
if (elapsed > expectedDuration * 1.5) {
await sendAlert('Batch job running slow', {
jobId,
elapsed,
expected: expectedDuration
});
}
}
Cost Optimization
Tiered Processing
Use cheaper models for bulk, premium for exceptions:
async function tieredBatchProcess(items, fabrics) {
// First pass: cheap model
const firstPass = await batchProcess(items, fabrics.budget);
// Identify low-quality results
const needsImprovement = firstPass.results.filter(
r => scoreQuality(r.output) < 0.7
);
// Second pass: premium model for low-quality only
if (needsImprovement.length > 0) {
const secondPass = await batchProcess(
needsImprovement.map(r => r.input),
fabrics.premium
);
// Merge results
return mergeResults(firstPass, secondPass);
}
return firstPass;
}
Best Practices
- Set appropriate concurrency - Balance speed vs rate limits
- Implement checkpoints - Enable recovery from failures
- Use queues - Reliable job management
- Monitor progress - Track completion and errors
- Handle errors gracefully - Retry with backoff, use DLQ
- Optimize costs - Use tiered processing
- Test at scale - Verify with production-like volumes
- Document jobs - Log inputs, outputs, and metadata
Resources
Ready to create?
Put what you've learned into practice with Promptha's AI-powered tools.
Get Started Free