Skip to content

Commit

Permalink
final fix?
Browse files Browse the repository at this point in the history
  • Loading branch information
admineral committed Sep 25, 2024
1 parent 2eee23e commit f2b5b3e
Showing 1 changed file with 83 additions and 54 deletions.
137 changes: 83 additions & 54 deletions src/app/api/redis/process/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ import { NextRequest, NextResponse } from 'next/server';
import Redis from 'ioredis';
import { z } from 'zod';
import { createReadStream } from 'fs';
import csv from 'csv-parser';
import { parse } from 'csv-parse';
import path from 'path';
import { pipeline } from 'stream/promises';

export const dynamic = 'force-dynamic';
export const maxDuration = 300;
Expand All @@ -16,105 +15,135 @@ const env = z.object({
NEXT_PUBLIC_IS_BUILD: z.string().optional(),
}).parse(process.env);

console.log('Environment variables validated');

// Initialize Redis client
const redis = new Redis(env.REDIS_URL, { password: env.REDIS_PASSWORD });
console.log('Redis client initialized');

// Batch size for Redis operations
const BATCH_SIZE = 1000;
console.log(`Batch size set to ${BATCH_SIZE}`);

// Process rows in batches
async function processBatch(batch: any[], type: 'sales' | 'price') {
// Process combined data in batches
async function processBatch(batch: Record<string, any>, sendProgress: (message: string) => void) {
console.log(`Starting to process batch of ${Object.keys(batch).length} items`);
const pipeline = redis.pipeline();

for (const row of batch) {
const { Client, Warehouse, Product, ...dates } = row;
const key = `${Client}:${Warehouse}:${Product}`;

for (const [date, value] of Object.entries(dates)) {
if (value !== undefined) {
pipeline.hset(key, `${type}:${date}`, String(value));
}
for (const [key, data] of Object.entries(batch)) {
for (const [field, value] of Object.entries(data)) {
pipeline.hset(key, field, String(value));
}


const [Client, Warehouse, Product] = key.split(':');
pipeline.sadd(`index:client:${Client}`, key);
pipeline.sadd(`index:warehouse:${Warehouse}`, key);
pipeline.sadd(`index:product:${Product}`, key);
}

await pipeline.exec();
console.log('Executing Redis pipeline');
const results = await pipeline.exec();
console.log(`Redis pipeline executed with ${results?.length} operations`);
sendProgress(`Uploaded batch of ${Object.keys(batch).length} items`);
}

// Stream and process a CSV file
async function streamCSV(filePath: string, type: 'sales' | 'price', sendProgress: (message: string) => void) {
let rowCount = 0;
let batch: any[] = [];

await pipeline(
createReadStream(filePath),
csv(),
async function* (source) {
for await (const row of source) {
batch.push(row);
rowCount++;

if (batch.length >= BATCH_SIZE) {
await processBatch(batch, type);
sendProgress(`Processed ${rowCount} rows from ${type} file`);
batch = [];
yield; // Allow event loop to handle other tasks
}
}
// Stream and process CSV files
async function streamCSVs(sendProgress: (message: string) => void) {
console.log('Starting CSV streaming and processing');
const dataDir = path.join(process.cwd(), 'public', 'data');
const salesPath = path.join(dataDir, 'Sales.csv');
const pricePath = path.join(dataDir, 'Price.csv');

console.log(`Sales CSV path: ${salesPath}`);
console.log(`Price CSV path: ${pricePath}`);

let combinedBatch: Record<string, any> = {};
let totalProcessed = 0;

if (batch.length > 0) {
await processBatch(batch, type);
const salesParser = createReadStream(salesPath).pipe(parse({ columns: true }));
const priceParser = createReadStream(pricePath).pipe(parse({ columns: true }));

console.log('CSV parsers created');

const processCombinedRow = async (row: any, type: 'sales' | 'price') => {
const { Client, Warehouse, Product, ...dates } = row;
const key = `${Client}:${Warehouse}:${Product}`;

if (!combinedBatch[key]) {
combinedBatch[key] = {};
}

for (const [date, value] of Object.entries(dates)) {
if (value !== undefined) {
combinedBatch[key][`${type}:${date}`] = String(value);
}
}

sendProgress(`Finished processing ${rowCount} rows from ${type} file`);
totalProcessed++;

if (Object.keys(combinedBatch).length >= BATCH_SIZE) {
console.log(`Batch size reached. Processing ${Object.keys(combinedBatch).length} items`);
await processBatch(combinedBatch, sendProgress);
combinedBatch = {};
sendProgress(`Processed ${totalProcessed} rows`);
}
);
}
};

// Process all CSV files
async function processCSVFiles(sendProgress: (message: string) => void) {
const dataDir = path.join(process.cwd(), 'public', 'data');
sendProgress(`Starting to process CSV files in directory: ${dataDir}`);

try {
await Promise.all([
streamCSV(path.join(dataDir, 'Sales.csv'), 'sales', sendProgress),
streamCSV(path.join(dataDir, 'Price.csv'), 'price', sendProgress)
]);
sendProgress('Finished processing all CSV files');
} catch (error) {
console.error('Error processing CSV files:', error);
sendProgress(`Error processing CSV files: ${error instanceof Error ? error.message : String(error)}`);
console.log('Starting to process Sales CSV');
for await (const row of salesParser) {
await processCombinedRow(row, 'sales');
}
console.log('Finished processing Sales CSV');

console.log('Starting to process Price CSV');
for await (const row of priceParser) {
await processCombinedRow(row, 'price');
}
console.log('Finished processing Price CSV');

// Process any remaining data
if (Object.keys(combinedBatch).length > 0) {
console.log(`Processing remaining ${Object.keys(combinedBatch).length} items`);
await processBatch(combinedBatch, sendProgress);
}

console.log(`Total rows processed: ${totalProcessed}`);
sendProgress(`Finished processing ${totalProcessed} total rows`);
}

// GET handler
export async function GET(request: NextRequest) {
console.log('GET request received');

if (env.NEXT_PUBLIC_IS_BUILD === 'true') {
console.log('Request during build time, returning early');
return NextResponse.json({ message: 'CSV processing is not available during build time' });
}

const stream = new ReadableStream({
async start(controller) {
console.log('Starting ReadableStream');
const sendProgress = (message: string) => {
console.log(`Progress: ${message}`);
controller.enqueue(`data: ${message}\n\n`);
};

try {
await processCSVFiles(sendProgress);
console.log('Starting CSV processing');
await streamCSVs(sendProgress);
console.log('CSV processing completed successfully');
controller.enqueue(`data: Successfully processed CSV files and stored data in Redis\n\n`);
} catch (error) {
console.error('Error processing CSV files:', error);
controller.enqueue(`data: Error processing CSV files: ${error instanceof Error ? error.message : 'Unknown error'}\n\n`);
} finally {
console.log('Closing ReadableStream controller');
controller.close();
}
},
});

console.log('Returning NextResponse with stream');
return new NextResponse(stream, {
headers: {
'Content-Type': 'text/event-stream',
Expand Down

0 comments on commit f2b5b3e

Please sign in to comment.