Building a Scalable CSV Processor with Node.js, Bull, and Stream Processing

By Alejandro Morales

In today’s data-driven world, processing large CSV files efficiently is a common challenge for many organizations. This article explores a robust solution for handling CSV files with around 50,000 records, where each row requires multiple API calls for data enrichment. We’ll dive into the architecture, key technologies, and best practices used to create a scalable and performant CSV processor.

The Challenge

Our task is to process a CSV file containing approximately 50,000 records. Each record requires about 5 different API calls for data enrichment, with each call taking an average of 2-3 seconds to complete. This means that processing a single record could take up to 15 seconds, and the entire file could potentially take over 200 hours to process sequentially.

The main challenges we face are:

  1. Handling a large volume of data efficiently
  2. Managing multiple API calls for each record
  3. Ensuring the system remains responsive and doesn’t crash under load
  4. Providing progress updates and the ability to pause/resume processing

The Solution: A High-Level Overview

To address these challenges, we’ve developed a solution that leverages several key technologies and patterns:

  1. Node.js: For its non-blocking I/O and excellent performance in handling concurrent operations
  2. Bull: A Redis-based queue for Node.js, used for job management and distribution
  3. Stream Processing: To efficiently read and process the CSV file without loading it entirely into memory
  4. Prisma: An ORM for database operations, ensuring type safety and efficient queries
  5. API Integration: Various external APIs for data enrichment, including company information, LinkedIn profiles, and email validation

The solution follows these main steps:

  1. The CSV file is read using Node.js streams
  2. Each record is processed individually, with multiple API calls for data enrichment
  3. Processed records are saved to the database using Prisma
  4. Bull manages the job queue, handles retries, and provides progress updates

Now, let’s dive deeper into each component of the solution.

Stream Processing: Efficient CSV Parsing

The heart of our CSV processor is the processMasterCsv function, which uses Node.js streams to read and process the CSV file efficiently.

Node.js streams are one of the core features that make our CSV processor efficient and scalable. Let’s break down how we’re using streams and why they’re superior to a simple loop for this use case.

Stream Implementation

In our processMasterCsv function, we’re using the pipeline utility from Node.js streams:

return await pipeline(
    fs.createReadStream(filename),
    csv(),
    async function* dataTransformer(source) {
        for await (const record of source) {
            source.pause()
            // Process the record
            // ...
            source.resume()
        }
    }
)

This pipeline consists of three main parts:

  1. fs.createReadStream(filename): Creates a readable stream from the CSV file.
  2. csv(): A transform stream that parses the CSV data into JavaScript objects.
  3. dataTransformer: Our custom transform stream that processes each record.

Advantages of Streams

  1. Memory Efficiency

    With streams, we read and process the file in small chunks rather than loading the entire file into memory. This is crucial when dealing with large CSV files (potentially gigabytes in size) that could easily exceed the available RAM if loaded all at once.

  2. Backpressure Management

    Streams in Node.js have built-in backpressure handling. In our code, we explicitly manage this with source.pause() and source.resume(). This ensures that we don’t overwhelm our system by reading data faster than we can process it.

  3. Asynchronous Processing

    Streams naturally work well with asynchronous operations. Our dataTransformer is an async generator function, allowing us to perform asynchronous operations (like API calls) for each record without blocking the entire process.

  4. Composability

    The pipeline pattern allows us to easily compose different stream operations. We could add more transform streams to the pipeline if needed, such as for data validation or formatting.

Comparison with a Loop Approach

Let’s compare this to a hypothetical loop-based approach:

async function processCSVWithLoop(filename) {
    const data = await fs.promises.readFile(filename, 'utf8');
    const records = parseCSV(data);
    for (const record of records) {
        await processRecord(record);
    }
}

While this looks simpler, it has several drawbacks:

  1. Memory Usage: It loads the entire file into memory at once, which is problematic for large files.
  2. Lack of Backpressure: There’s no built-in way to pause processing if it’s going too fast.
  3. Blocking Nature: Each iteration of the loop waits for the previous one to complete, potentially leading to longer processing times.

Detailed Stream Flow

Let’s break down how data flows through our stream pipeline:

  1. Read Stream: fs.createReadStream reads the file in chunks (typically 64KB by default).
  2. CSV Parsing: Each chunk is passed to the CSV parser, which converts it into JavaScript objects.
  3. Data Transformer: Our custom transformer receives these objects one at a time.
  4. Processing: For each record, we:
    • Pause the stream (source.pause())
    • Process the record (data enrichment, API calls, etc.)
    • Save to database
    • Resume the stream (source.resume())

This flow allows us to control the pace of processing, crucial when dealing with rate-limited APIs or database writes.

Error Handling in Streams

Streams also provide robust error handling mechanisms. We can attach error listeners to each stream in the pipeline:

readStream.on('error', handleError);
csvParser.on('error', handleError);

The pipeline function we’re using automatically handles errors from any stream in the pipeline, making error management more straightforward.

Pause and Resume Mechanics

The pause and resume functionality is key to our backpressure management:

for await (const record of source) {
    source.pause();
    try {
        await processRecord(record);
    } finally {
        source.resume();
    }
}

This ensures that even if processRecord throws an error, we always resume the stream, preventing it from stalling.

Trade-off

  1. Parallel Processing: We could implement a pool of worker streams to process multiple records in parallel, further improving throughput.

  2. Buffering: Implement a buffer of pre-fetched records to smooth out processing time variations.

  3. Adaptive Pausing: Instead of pausing after every record, we could implement an adaptive system that pauses based on system load or API rate limits.

The stream-based approach offers significant advantages in terms of memory efficiency, backpressure management, and scalability. It allows our CSV processor to handle files of any size while maintaining controlled, efficient processing. This is especially crucial when each record requires multiple time-consuming API calls.

Data Transformation and Enrichment

Once we have a record from the CSV, we perform several steps to transform and enrich the data:

  1. Normalize field names: Convert CSV headers to camelCase for consistency
  2. Basic data extraction: Extract known fields like name, company, and contact information
  3. Database lookups: Check if we already have information about this person or company in our database
  4. API calls for enrichment: Make multiple API calls to gather additional information, such as:
    • Getting the company website from the company name
    • Finding LinkedIn profiles based on name and company
    • Validating and enriching email addresses

Here’s a simplified example of how we enrich the data:

if (!data.website && data.company) {
    let website = await getWebsiteFromCompany(data.company)
    data.enriched = true
    // ... (process website URL)
    data.website = website
}

if (data.website && !data.linkedin && data.fullName) {
    data.linkedin = await getLinkedinFromWebsite(data)
    data.enriched = true
}

// ... (additional enrichment steps)

Each of these enrichment steps involves an API call, which is why processing a single record can take several seconds.

Managing API Calls and Rate Limiting

When dealing with external APIs, it’s crucial to manage rate limits and handle potential errors. In our solution, we’ve implemented several strategies:

  1. Retry mechanism: Bull’s job queue system allows us to retry failed jobs automatically
  2. Error handling: We catch and log errors from API calls, ensuring that a single failure doesn’t crash the entire process
  3. Rate limiting: We keep track of API calls and pause processing if we approach rate limits

For example, here’s how we handle Google search API calls:

let googleCalled = 0
const googleCallLimit = parseInt(process.env.SCRAPERAPI_LIMIT, 10) ?? 10;

// ... (in the processing loop)
if (googleCalled >= googleCallLimit) {
    await sleep(60000) // Wait for 1 minute
    googleCalled = 0
}
googleCalled++

This approach ensures that we don’t exceed API rate limits while still processing records as quickly as possible.

Database Operations with Prisma

After enriching a record, we save it to the database using Prisma. Prisma provides type-safe database operations and efficient querying. Here’s an example of how we save a processed record:

const [recordResponse] = await prisma.$transaction([
    prisma.record.create({
        data,
    }),
    prisma.list.update({
        where: {
            id: meta.id,
        },
        data: {
            dataEnriched: {
                increment: data.enriched ? 1 : 0,
            },
        },
    }),
])

We use a transaction to ensure that both the record creation and the list update occur atomically. This prevents data inconsistencies in case of a failure during the save process.

Job Management with Bull

While our code snippet doesn’t show the Bull queue setup directly, it’s an integral part of the overall system. Bull is used to manage the CSV processing jobs, providing features like:

  1. Job queuing: Allowing multiple CSV files to be processed in parallel
  2. Progress tracking: Updating the job progress as records are processed
  3. Pause and resume: Ability to pause and resume jobs as needed
  4. Retry mechanism: Automatically retrying failed jobs
  5. Concurrency control: Limiting the number of jobs running simultaneously

Here’s how we update the job progress in our code:

const newProgress = (current / total) * 100
await job.progress(newProgress)
await updateRemainingTime({
    job,
    total,
    current,
    startTime,
})

This progress update allows the system to provide real-time feedback on the processing status.

Performance Optimization and Scalability

Several techniques are employed to optimize performance and ensure scalability:

  1. Streaming: By processing the CSV file as a stream, we avoid loading the entire file into memory, allowing us to handle files of any size.

  2. Asynchronous processing: We use async/await throughout the code to handle I/O operations without blocking the event loop.

  3. Batching: While not shown in the provided code snippet, batching database operations can significantly improve performance when dealing with large volumes of data.

  4. Caching: Implementing a caching layer for frequently accessed data (e.g., company information) can reduce the number of API calls required.

  5. Horizontal scaling: The use of Bull and Redis allows for distributing the workload across multiple worker processes or even multiple machines.

Monitoring and Logging

Proper monitoring and logging are crucial for maintaining and troubleshooting a system of this complexity. Our code uses a custom ConsoleWrapper for logging:

const logger = new ConsoleWrapper("FINDEMAIL::")

// ... (in the code)
logger.log("Trying to get Linkedin from Website for", data.fullName)

In a production environment, this logging system could be extended to send logs to a centralized logging service for easier monitoring and analysis.

Error Handling and Resilience

The system implements several error handling strategies to ensure resilience:

  1. Try-catch blocks: Wrapping API calls and other potentially failing operations in try-catch blocks to prevent crashes.

  2. Graceful degradation: If an API call fails or data is missing, the system continues processing with the available information.

  3. Transaction rollback: Using Prisma transactions ensures that database operations are atomic, preventing partial updates in case of failures.

Future Improvements and Considerations

While the current system is robust and scalable, there are always areas for improvement:

  1. Microservices architecture: Breaking down the enrichment steps into separate microservices could improve scalability and allow for independent scaling of different components.

  2. Machine learning integration: Implementing ML models for tasks like company classification or email validity prediction could reduce reliance on external APIs.

  3. Real-time processing: Moving towards a more real-time processing model using technologies like Apache Kafka could allow for continuous data ingestion and processing.

  4. Enhanced monitoring: Implementing more comprehensive monitoring and alerting systems to proactively identify and address issues.

  5. API caching and result sharing: Implementing a shared cache for API results could significantly reduce the number of API calls required, especially for common queries.

Conclusion

Building a scalable CSV processor capable of handling large volumes of data with complex enrichment requirements is a challenging task. By leveraging technologies like Node.js streams, Bull for job queuing, and Prisma for database operations, we’ve created a robust solution that can efficiently process 50,000+ records, each requiring multiple API calls.

The key takeaways from this project are:

  1. Use stream processing to handle large files efficiently
  2. Implement backpressure management to control resource usage
  3. Leverage job queues for better control over processing and scalability
  4. Use transactions and error handling to ensure data consistency
  5. Optimize API usage through caching and rate limiting
  6. Continuously monitor and log system performance for ongoing improvement

Written by Alejandro Morales on

Back to Chronicles