From f2b5b3eb1e6fe02455f9a7dcad1015930b593f08 Mon Sep 17 00:00:00 2001 From: Elias Date: Wed, 25 Sep 2024 03:29:31 +0200 Subject: [PATCH] final fix? --- src/app/api/redis/process/route.ts | 137 +++++++++++++++++------------ 1 file changed, 83 insertions(+), 54 deletions(-) diff --git a/src/app/api/redis/process/route.ts b/src/app/api/redis/process/route.ts index 957af23..b473efb 100644 --- a/src/app/api/redis/process/route.ts +++ b/src/app/api/redis/process/route.ts @@ -2,9 +2,8 @@ import { NextRequest, NextResponse } from 'next/server'; import Redis from 'ioredis'; import { z } from 'zod'; import { createReadStream } from 'fs'; -import csv from 'csv-parser'; +import { parse } from 'csv-parse'; import path from 'path'; -import { pipeline } from 'stream/promises'; export const dynamic = 'force-dynamic'; export const maxDuration = 300; @@ -16,105 +15,135 @@ const env = z.object({ NEXT_PUBLIC_IS_BUILD: z.string().optional(), }).parse(process.env); +console.log('Environment variables validated'); + // Initialize Redis client const redis = new Redis(env.REDIS_URL, { password: env.REDIS_PASSWORD }); +console.log('Redis client initialized'); // Batch size for Redis operations const BATCH_SIZE = 1000; +console.log(`Batch size set to ${BATCH_SIZE}`); -// Process rows in batches -async function processBatch(batch: any[], type: 'sales' | 'price') { +// Process combined data in batches +async function processBatch(batch: Record, sendProgress: (message: string) => void) { + console.log(`Starting to process batch of ${Object.keys(batch).length} items`); const pipeline = redis.pipeline(); - for (const row of batch) { - const { Client, Warehouse, Product, ...dates } = row; - const key = `${Client}:${Warehouse}:${Product}`; - - for (const [date, value] of Object.entries(dates)) { - if (value !== undefined) { - pipeline.hset(key, `${type}:${date}`, String(value)); - } + for (const [key, data] of Object.entries(batch)) { + for (const [field, value] of Object.entries(data)) { + pipeline.hset(key, field, String(value)); } - + + const [Client, Warehouse, Product] = key.split(':'); pipeline.sadd(`index:client:${Client}`, key); pipeline.sadd(`index:warehouse:${Warehouse}`, key); pipeline.sadd(`index:product:${Product}`, key); } - await pipeline.exec(); + console.log('Executing Redis pipeline'); + const results = await pipeline.exec(); + console.log(`Redis pipeline executed with ${results?.length} operations`); + sendProgress(`Uploaded batch of ${Object.keys(batch).length} items`); } -// Stream and process a CSV file -async function streamCSV(filePath: string, type: 'sales' | 'price', sendProgress: (message: string) => void) { - let rowCount = 0; - let batch: any[] = []; - - await pipeline( - createReadStream(filePath), - csv(), - async function* (source) { - for await (const row of source) { - batch.push(row); - rowCount++; - - if (batch.length >= BATCH_SIZE) { - await processBatch(batch, type); - sendProgress(`Processed ${rowCount} rows from ${type} file`); - batch = []; - yield; // Allow event loop to handle other tasks - } - } +// Stream and process CSV files +async function streamCSVs(sendProgress: (message: string) => void) { + console.log('Starting CSV streaming and processing'); + const dataDir = path.join(process.cwd(), 'public', 'data'); + const salesPath = path.join(dataDir, 'Sales.csv'); + const pricePath = path.join(dataDir, 'Price.csv'); + + console.log(`Sales CSV path: ${salesPath}`); + console.log(`Price CSV path: ${pricePath}`); + + let combinedBatch: Record = {}; + let totalProcessed = 0; - if (batch.length > 0) { - await processBatch(batch, type); + const salesParser = createReadStream(salesPath).pipe(parse({ columns: true })); + const priceParser = createReadStream(pricePath).pipe(parse({ columns: true })); + + console.log('CSV parsers created'); + + const processCombinedRow = async (row: any, type: 'sales' | 'price') => { + const { Client, Warehouse, Product, ...dates } = row; + const key = `${Client}:${Warehouse}:${Product}`; + + if (!combinedBatch[key]) { + combinedBatch[key] = {}; + } + + for (const [date, value] of Object.entries(dates)) { + if (value !== undefined) { + combinedBatch[key][`${type}:${date}`] = String(value); } + } - sendProgress(`Finished processing ${rowCount} rows from ${type} file`); + totalProcessed++; + + if (Object.keys(combinedBatch).length >= BATCH_SIZE) { + console.log(`Batch size reached. Processing ${Object.keys(combinedBatch).length} items`); + await processBatch(combinedBatch, sendProgress); + combinedBatch = {}; + sendProgress(`Processed ${totalProcessed} rows`); } - ); -} + }; -// Process all CSV files -async function processCSVFiles(sendProgress: (message: string) => void) { - const dataDir = path.join(process.cwd(), 'public', 'data'); - sendProgress(`Starting to process CSV files in directory: ${dataDir}`); - - try { - await Promise.all([ - streamCSV(path.join(dataDir, 'Sales.csv'), 'sales', sendProgress), - streamCSV(path.join(dataDir, 'Price.csv'), 'price', sendProgress) - ]); - sendProgress('Finished processing all CSV files'); - } catch (error) { - console.error('Error processing CSV files:', error); - sendProgress(`Error processing CSV files: ${error instanceof Error ? error.message : String(error)}`); + console.log('Starting to process Sales CSV'); + for await (const row of salesParser) { + await processCombinedRow(row, 'sales'); + } + console.log('Finished processing Sales CSV'); + + console.log('Starting to process Price CSV'); + for await (const row of priceParser) { + await processCombinedRow(row, 'price'); } + console.log('Finished processing Price CSV'); + + // Process any remaining data + if (Object.keys(combinedBatch).length > 0) { + console.log(`Processing remaining ${Object.keys(combinedBatch).length} items`); + await processBatch(combinedBatch, sendProgress); + } + + console.log(`Total rows processed: ${totalProcessed}`); + sendProgress(`Finished processing ${totalProcessed} total rows`); } // GET handler export async function GET(request: NextRequest) { + console.log('GET request received'); + if (env.NEXT_PUBLIC_IS_BUILD === 'true') { + console.log('Request during build time, returning early'); return NextResponse.json({ message: 'CSV processing is not available during build time' }); } const stream = new ReadableStream({ async start(controller) { + console.log('Starting ReadableStream'); const sendProgress = (message: string) => { + console.log(`Progress: ${message}`); controller.enqueue(`data: ${message}\n\n`); }; try { - await processCSVFiles(sendProgress); + console.log('Starting CSV processing'); + await streamCSVs(sendProgress); + console.log('CSV processing completed successfully'); controller.enqueue(`data: Successfully processed CSV files and stored data in Redis\n\n`); } catch (error) { console.error('Error processing CSV files:', error); controller.enqueue(`data: Error processing CSV files: ${error instanceof Error ? error.message : 'Unknown error'}\n\n`); } finally { + console.log('Closing ReadableStream controller'); controller.close(); } }, }); + console.log('Returning NextResponse with stream'); return new NextResponse(stream, { headers: { 'Content-Type': 'text/event-stream',