Advanced 13 min read

Batch Processing at Scale

Process large volumes of content efficiently with batch AI operations and parallel processing.

Batch Processing at Scale

Batch Processing at Scale

Learn how to efficiently process large volumes of content with AI batch operations.

When to Use Batch Processing

Batch processing is ideal for:

  • Processing 100+ items at once
  • Scheduled or overnight jobs
  • Data migration and enrichment
  • Catalog updates
  • Bulk content generation
  • Background processing

Batch vs Real-Time

Aspect Batch Real-Time
Latency tolerance High (minutes-hours) Low (seconds)
Volume High (1000s) Low (1-10)
Cost efficiency Higher Lower
Error handling Aggregate Individual
Use case Background jobs User-facing

Batch Processing Architecture

Overview

Input Data → Chunking → Parallel Processing → Results Aggregation → Output
    │           │              │                    │                │
    │           │              │                    │                │
  Upload     Split into     Run multiple        Collect and       Store or
  or stream  manageable     workers in          merge results     deliver
             batches        parallel

Key Components

  1. Input Handler - Accept and validate input data
  2. Chunker - Split into processable batches
  3. Worker Pool - Parallel AI processing
  4. Result Aggregator - Collect and merge outputs
  5. Output Handler - Store or deliver results

Implementation

Basic Batch Processor

class BatchProcessor {
  constructor(options = {}) {
    this.concurrency = options.concurrency || 10;
    this.retryAttempts = options.retryAttempts || 3;
    this.rateLimitPerMinute = options.rateLimitPerMinute || 100;
  }

  async process(items, fabricId) {
    const results = [];
    const errors = [];

    // Process in chunks
    for (let i = 0; i < items.length; i += this.concurrency) {
      const chunk = items.slice(i, i + this.concurrency);

      const chunkResults = await Promise.allSettled(
        chunk.map(item => this.processItem(item, fabricId))
      );

      // Collect results and errors
      chunkResults.forEach((result, index) => {
        if (result.status === 'fulfilled') {
          results.push({
            input: chunk[index],
            output: result.value,
            success: true
          });
        } else {
          errors.push({
            input: chunk[index],
            error: result.reason.message,
            success: false
          });
        }
      });

      // Rate limiting
      await this.rateLimit();
    }

    return { results, errors, total: items.length };
  }

  async processItem(item, fabricId) {
    for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
      try {
        return await callPromptha('/fabrics/run', 'POST', {
          fabricId,
          inputs: item
        });
      } catch (error) {
        if (attempt === this.retryAttempts) throw error;
        await sleep(Math.pow(2, attempt) * 1000);
      }
    }
  }

  async rateLimit() {
    const msPerRequest = (60 * 1000) / this.rateLimitPerMinute;
    await sleep(msPerRequest * this.concurrency);
  }
}

// Usage
const processor = new BatchProcessor({ concurrency: 10 });
const { results, errors } = await processor.process(products, 'product-description');

Job Queue Implementation

For production workloads, use a job queue:

const Queue = require('bull');
const batchQueue = new Queue('batch-processing', {
  redis: process.env.REDIS_URL
});

// Producer: Add batch job
async function startBatchJob(items, fabricId, options = {}) {
  const job = await batchQueue.add('process-batch', {
    items,
    fabricId,
    options
  }, {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000
    }
  });

  return job.id;
}

// Consumer: Process batch
batchQueue.process('process-batch', async (job) => {
  const { items, fabricId, options } = job.data;
  const processor = new BatchProcessor(options);

  // Update progress
  let processed = 0;
  for (const chunk of chunks(items, options.concurrency)) {
    await processor.processChunk(chunk, fabricId);
    processed += chunk.length;
    await job.progress(Math.round((processed / items.length) * 100));
  }

  return { processed, total: items.length };
});

// Monitor progress
batchQueue.on('progress', (job, progress) => {
  console.log(`Job ${job.id}: ${progress}% complete`);
});

Distributed Processing

For very large batches, distribute across workers:

class DistributedBatchProcessor {
  constructor(workers) {
    this.workers = workers; // Array of worker URLs
  }

  async process(items, fabricId) {
    // Distribute items across workers
    const itemsPerWorker = Math.ceil(items.length / this.workers.length);
    const assignments = this.workers.map((worker, i) => ({
      worker,
      items: items.slice(i * itemsPerWorker, (i + 1) * itemsPerWorker)
    }));

    // Start all workers
    const workerPromises = assignments.map(({ worker, items }) =>
      this.runWorker(worker, items, fabricId)
    );

    // Aggregate results
    const workerResults = await Promise.all(workerPromises);
    return this.aggregateResults(workerResults);
  }

  async runWorker(workerUrl, items, fabricId) {
    const response = await fetch(`${workerUrl}/process`, {
      method: 'POST',
      body: JSON.stringify({ items, fabricId })
    });
    return response.json();
  }

  aggregateResults(workerResults) {
    return {
      results: workerResults.flatMap(r => r.results),
      errors: workerResults.flatMap(r => r.errors),
      total: workerResults.reduce((sum, r) => sum + r.total, 0)
    };
  }
}

File-Based Batch Processing

CSV Input Processing

const csv = require('csv-parser');
const fs = require('fs');

async function processCsvBatch(inputPath, fabricId, outputPath) {
  const items = [];

  // Read CSV
  await new Promise((resolve, reject) => {
    fs.createReadStream(inputPath)
      .pipe(csv())
      .on('data', (row) => items.push(row))
      .on('end', resolve)
      .on('error', reject);
  });

  console.log(`Processing ${items.length} items...`);

  // Process batch
  const processor = new BatchProcessor();
  const { results, errors } = await processor.process(items, fabricId);

  // Write results
  const outputData = results.map(r => ({
    ...r.input,
    ai_output: r.output.result
  }));

  await writeCsv(outputPath, outputData);

  console.log(`Complete: ${results.length} success, ${errors.length} errors`);
  return { results, errors };
}

Streaming Large Files

For files too large to fit in memory:

const { Transform } = require('stream');
const { pipeline } = require('stream/promises');

class BatchTransform extends Transform {
  constructor(fabricId, batchSize = 100) {
    super({ objectMode: true });
    this.fabricId = fabricId;
    this.batchSize = batchSize;
    this.buffer = [];
  }

  async _transform(item, encoding, callback) {
    this.buffer.push(item);

    if (this.buffer.length >= this.batchSize) {
      const results = await this.processBatch(this.buffer);
      this.buffer = [];
      results.forEach(r => this.push(r));
    }

    callback();
  }

  async _flush(callback) {
    if (this.buffer.length > 0) {
      const results = await this.processBatch(this.buffer);
      results.forEach(r => this.push(r));
    }
    callback();
  }

  async processBatch(items) {
    // Process with AI
    const results = await Promise.all(
      items.map(item => callPromptha('/fabrics/run', 'POST', {
        fabricId: this.fabricId,
        inputs: item
      }))
    );
    return results;
  }
}

// Usage
await pipeline(
  fs.createReadStream('input.csv').pipe(csv()),
  new BatchTransform('product-description', 50),
  stringify(),
  fs.createWriteStream('output.csv')
);

Error Handling and Recovery

Checkpoint System

Save progress for recovery:

class CheckpointedBatchProcessor {
  constructor(checkpointPath) {
    this.checkpointPath = checkpointPath;
  }

  async process(items, fabricId) {
    // Load checkpoint if exists
    const checkpoint = await this.loadCheckpoint();
    const startIndex = checkpoint?.lastProcessed || 0;

    const results = checkpoint?.results || [];
    const errors = checkpoint?.errors || [];

    for (let i = startIndex; i < items.length; i++) {
      try {
        const result = await this.processItem(items[i], fabricId);
        results.push({ index: i, input: items[i], output: result });
      } catch (error) {
        errors.push({ index: i, input: items[i], error: error.message });
      }

      // Save checkpoint every 10 items
      if ((i + 1) % 10 === 0) {
        await this.saveCheckpoint({
          lastProcessed: i + 1,
          results,
          errors
        });
      }
    }

    // Clear checkpoint on completion
    await this.clearCheckpoint();

    return { results, errors };
  }

  async loadCheckpoint() {
    try {
      const data = await fs.readFile(this.checkpointPath, 'utf8');
      return JSON.parse(data);
    } catch {
      return null;
    }
  }

  async saveCheckpoint(data) {
    await fs.writeFile(this.checkpointPath, JSON.stringify(data));
  }

  async clearCheckpoint() {
    await fs.unlink(this.checkpointPath).catch(() => {});
  }
}

Dead Letter Queue

Handle persistent failures:

const deadLetterQueue = new Queue('dead-letters');

async function processWithDLQ(item, fabricId, maxAttempts = 3) {
  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await callPromptha('/fabrics/run', 'POST', {
        fabricId,
        inputs: item
      });
    } catch (error) {
      if (attempt === maxAttempts) {
        // Send to dead letter queue for manual review
        await deadLetterQueue.add('failed-item', {
          item,
          fabricId,
          error: error.message,
          attempts: maxAttempts,
          failedAt: new Date()
        });
        throw error;
      }
      await sleep(Math.pow(2, attempt) * 1000);
    }
  }
}

// Process dead letters manually or with retry
deadLetterQueue.process('failed-item', async (job) => {
  console.log('Manual review needed:', job.data);
  // Implement manual review workflow
});

Monitoring and Observability

Progress Tracking

class MonitoredBatchProcessor extends BatchProcessor {
  constructor(options) {
    super(options);
    this.metrics = {
      started: 0,
      completed: 0,
      failed: 0,
      totalLatency: 0
    };
  }

  async processItem(item, fabricId) {
    this.metrics.started++;
    const start = Date.now();

    try {
      const result = await super.processItem(item, fabricId);
      this.metrics.completed++;
      this.metrics.totalLatency += Date.now() - start;
      return result;
    } catch (error) {
      this.metrics.failed++;
      throw error;
    }
  }

  getStats() {
    return {
      ...this.metrics,
      successRate: this.metrics.completed / this.metrics.started,
      avgLatency: this.metrics.totalLatency / this.metrics.completed
    };
  }
}

Alerting

async function monitorBatch(jobId, alertThresholds) {
  const job = await batchQueue.getJob(jobId);

  // Alert on high error rate
  const errorRate = job.data.errors / job.data.total;
  if (errorRate > alertThresholds.maxErrorRate) {
    await sendAlert('High error rate in batch job', {
      jobId,
      errorRate,
      threshold: alertThresholds.maxErrorRate
    });
  }

  // Alert on slow processing
  const elapsed = Date.now() - job.processedOn;
  const expectedDuration = job.data.total * alertThresholds.msPerItem;
  if (elapsed > expectedDuration * 1.5) {
    await sendAlert('Batch job running slow', {
      jobId,
      elapsed,
      expected: expectedDuration
    });
  }
}

Cost Optimization

Tiered Processing

Use cheaper models for bulk, premium for exceptions:

async function tieredBatchProcess(items, fabrics) {
  // First pass: cheap model
  const firstPass = await batchProcess(items, fabrics.budget);

  // Identify low-quality results
  const needsImprovement = firstPass.results.filter(
    r => scoreQuality(r.output) < 0.7
  );

  // Second pass: premium model for low-quality only
  if (needsImprovement.length > 0) {
    const secondPass = await batchProcess(
      needsImprovement.map(r => r.input),
      fabrics.premium
    );

    // Merge results
    return mergeResults(firstPass, secondPass);
  }

  return firstPass;
}

Best Practices

  1. Set appropriate concurrency - Balance speed vs rate limits
  2. Implement checkpoints - Enable recovery from failures
  3. Use queues - Reliable job management
  4. Monitor progress - Track completion and errors
  5. Handle errors gracefully - Retry with backoff, use DLQ
  6. Optimize costs - Use tiered processing
  7. Test at scale - Verify with production-like volumes
  8. Document jobs - Log inputs, outputs, and metadata

Resources

Ready to create?

Put what you've learned into practice with Promptha's AI-powered tools.

Get Started Free