Skip to content

Commit

Permalink
📥 feat: Import Conversations from LibreChat, ChatGPT, Chatbot UI (#2355)
Browse files Browse the repository at this point in the history
* Basic implementation of ChatGPT conversation import

* remove debug code

* Handle citations

* Fix updatedAt in import

* update default model

* Use job scheduler to handle import requests

* import job status endpoint

* Add wrapper around Agenda

* Rate limits for import endpoint

* rename import api path

* Batch save import to mongo

* Improve naming

* Add documenting comments

* Test for importers

* Change button for importing conversations

* Frontend changes

* Import job status endpoint

* Import endpoint response

* Add translations to new phrases

* Fix conversations refreshing

* cleanup unused functions

* set timeout for import job status polling

* Add documentation

* get extra spaces back

* Improve error message

* Fix translation files after merge

* fix translation files 2

* Add zh translation for import functionality

* Sync mailisearch index after import

* chore: add dummy uri for jest tests, as MONGO_URI should only be real for E2E tests

* docs: fix links

* docs: fix conversationsImport section

* fix: user role issue for librechat imports

* refactor: import conversations from json
- organize imports
- add additional jsdocs
- use multer with diskStorage to avoid loading file into memory outside of job
- use filepath instead of loading data string for imports
- replace console logs and some logger.info() with logger.debug
- only use multer for import route

* fix: undefined metadata edge case and replace ChatGtp -> ChatGpt

* Refactor importChatGptConvo function to handle undefined metadata edge case and replace ChatGtp with ChatGpt

* fix: chatgpt importer

* feat: maintain tree relationship for librechat messages

* chore: use enum

* refactor: saveMessage to use single object arg, replace console logs, add userId to log message

* chore: additional comment

* chore: multer edge case

* feat: first pass, maintain tree relationship

* chore: organize

* chore: remove log

* ci: add heirarchy test for chatgpt

* ci: test maintaining of heirarchy for librechat

* wip: allow non-text content type messages

* refactor: import content part object json string

* refactor: more content types to format

* chore: consolidate messageText formatting

* docs: update on changes, bump data-provider/config versions, update readme

* refactor(indexSync): singleton pattern for MeiliSearchClient

* refactor: debug log after batch is done

* chore: add back indexSync error handling

---------

Co-authored-by: jakubmieszczak <[email protected]>
Co-authored-by: Danny Avila <[email protected]>
  • Loading branch information
3 people authored May 2, 2024
1 parent 3b44741 commit ab6fbe4
Show file tree
Hide file tree
Showing 64 changed files with 3,795 additions and 98 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ config.local.ts
**/storageState.json
junit.xml
**/.venv/
**/venv/

# docker override file
docker-compose.override.yaml
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
- Русский, 日本語, Svenska, 한국어, Tiếng Việt, 繁體中文, العربية, Türkçe, Nederlands, עברית
- 🤖 AI model selection: OpenAI, Azure OpenAI, BingAI, ChatGPT, Google Vertex AI, Anthropic (Claude), Plugins, Assistants API (including Azure Assistants)
- 💾 Create, Save, & Share Custom Presets
- 🎨 Customizable Dropdown & Interface: Adapts to both power users and newcomers.
- 🔄 Edit, Resubmit, and Continue messages with conversation branching
- 📥 Import Conversations from LibreChat, ChatGPT, Chatbot UI
- 📤 Export conversations as screenshots, markdown, text, json.
- 🔍 Search all messages/conversations
- 🔌 Plugins, including web access, image generation with DALL-E-3 and more
Expand Down
42 changes: 22 additions & 20 deletions api/lib/db/indexSync.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
const { MeiliSearch } = require('meilisearch');
const Message = require('~/models/schema/messageSchema');
const Conversation = require('~/models/schema/convoSchema');
const Message = require('~/models/schema/messageSchema');
const { logger } = require('~/config');

const searchEnabled = process.env?.SEARCH?.toLowerCase() === 'true';
let currentTimeout = null;

class MeiliSearchClient {
static instance = null;

static getInstance() {
if (!MeiliSearchClient.instance) {
if (!process.env.MEILI_HOST || !process.env.MEILI_MASTER_KEY) {
throw new Error('Meilisearch configuration is missing.');
}
MeiliSearchClient.instance = new MeiliSearch({
host: process.env.MEILI_HOST,
apiKey: process.env.MEILI_MASTER_KEY,
});
}
return MeiliSearchClient.instance;
}
}

// eslint-disable-next-line no-unused-vars
async function indexSync(req, res, next) {
if (!searchEnabled) {
return;
}

try {
if (!process.env.MEILI_HOST || !process.env.MEILI_MASTER_KEY || !searchEnabled) {
throw new Error('Meilisearch not configured, search will be disabled.');
}

const client = new MeiliSearch({
host: process.env.MEILI_HOST,
apiKey: process.env.MEILI_MASTER_KEY,
});
const client = MeiliSearchClient.getInstance();

const { status } = await client.health();
// logger.debug(`[indexSync] Meilisearch: ${status}`);
const result = status === 'available' && !!process.env.SEARCH;

if (!result) {
if (status !== 'available' || !process.env.SEARCH) {
throw new Error('Meilisearch not available');
}

Expand All @@ -37,12 +44,8 @@ async function indexSync(req, res, next) {
const messagesIndexed = messages.numberOfDocuments;
const convosIndexed = convos.numberOfDocuments;

logger.debug(
`[indexSync] There are ${messageCount} messages in the database, ${messagesIndexed} indexed`,
);
logger.debug(
`[indexSync] There are ${convoCount} convos in the database, ${convosIndexed} indexed`,
);
logger.debug(`[indexSync] There are ${messageCount} messages and ${messagesIndexed} indexed`);
logger.debug(`[indexSync] There are ${convoCount} convos and ${convosIndexed} indexed`);

if (messageCount !== messagesIndexed) {
logger.debug('[indexSync] Messages out of sync, indexing');
Expand All @@ -54,7 +57,6 @@ async function indexSync(req, res, next) {
Conversation.syncWithMeili();
}
} catch (err) {
// logger.debug('[indexSync] in index sync');
if (err.message.includes('not found')) {
logger.debug('[indexSync] Creating indices...');
currentTimeout = setTimeout(async () => {
Expand Down
18 changes: 18 additions & 0 deletions api/models/Conversation.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,24 @@ module.exports = {
return { message: 'Error saving conversation' };
}
},
bulkSaveConvos: async (conversations) => {
try {
const bulkOps = conversations.map((convo) => ({
updateOne: {
filter: { conversationId: convo.conversationId, user: convo.user },
update: convo,
upsert: true,
timestamps: false,
},
}));

const result = await Conversation.bulkWrite(bulkOps);
return result;
} catch (error) {
logger.error('[saveBulkConversations] Error saving conversations in bulk', error);
throw new Error('Failed to save conversations in bulk.');
}
},
getConvosByPage: async (user, pageNumber = 1, pageSize = 25) => {
try {
const totalConvos = (await Conversation.countDocuments({ user })) || 1;
Expand Down
19 changes: 19 additions & 0 deletions api/models/Message.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,25 @@ module.exports = {
throw new Error('Failed to save message.');
}
},

async bulkSaveMessages(messages) {
try {
const bulkOps = messages.map((message) => ({
updateOne: {
filter: { messageId: message.messageId },
update: message,
upsert: true,
},
}));

const result = await Message.bulkWrite(bulkOps);
return result;
} catch (err) {
logger.error('Error saving messages in bulk:', err);
throw new Error('Failed to save messages in bulk.');
}
},

/**
* Records a message in the database.
*
Expand Down
69 changes: 69 additions & 0 deletions api/server/middleware/importLimiters.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const rateLimit = require('express-rate-limit');
const { ViolationTypes } = require('librechat-data-provider');
const logViolation = require('~/cache/logViolation');

const getEnvironmentVariables = () => {
const IMPORT_IP_MAX = parseInt(process.env.IMPORT_IP_MAX) || 100;
const IMPORT_IP_WINDOW = parseInt(process.env.IMPORT_IP_WINDOW) || 15;
const IMPORT_USER_MAX = parseInt(process.env.IMPORT_USER_MAX) || 50;
const IMPORT_USER_WINDOW = parseInt(process.env.IMPORT_USER_WINDOW) || 15;

const importIpWindowMs = IMPORT_IP_WINDOW * 60 * 1000;
const importIpMax = IMPORT_IP_MAX;
const importIpWindowInMinutes = importIpWindowMs / 60000;

const importUserWindowMs = IMPORT_USER_WINDOW * 60 * 1000;
const importUserMax = IMPORT_USER_MAX;
const importUserWindowInMinutes = importUserWindowMs / 60000;

return {
importIpWindowMs,
importIpMax,
importIpWindowInMinutes,
importUserWindowMs,
importUserMax,
importUserWindowInMinutes,
};
};

const createImportHandler = (ip = true) => {
const { importIpMax, importIpWindowInMinutes, importUserMax, importUserWindowInMinutes } =
getEnvironmentVariables();

return async (req, res) => {
const type = ViolationTypes.FILE_UPLOAD_LIMIT;
const errorMessage = {
type,
max: ip ? importIpMax : importUserMax,
limiter: ip ? 'ip' : 'user',
windowInMinutes: ip ? importIpWindowInMinutes : importUserWindowInMinutes,
};

await logViolation(req, res, type, errorMessage);
res.status(429).json({ message: 'Too many conversation import requests. Try again later' });
};
};

const createImportLimiters = () => {
const { importIpWindowMs, importIpMax, importUserWindowMs, importUserMax } =
getEnvironmentVariables();

const importIpLimiter = rateLimit({
windowMs: importIpWindowMs,
max: importIpMax,
handler: createImportHandler(),
});

const importUserLimiter = rateLimit({
windowMs: importUserWindowMs,
max: importUserMax,
handler: createImportHandler(false),
keyGenerator: function (req) {
return req.user?.id; // Use the user ID or NULL if not available
},
});

return { importIpLimiter, importUserLimiter };
};

module.exports = { createImportLimiters };
2 changes: 2 additions & 0 deletions api/server/middleware/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const validateRegistration = require('./validateRegistration');
const validateImageRequest = require('./validateImageRequest');
const moderateText = require('./moderateText');
const noIndex = require('./noIndex');
const importLimiters = require('./importLimiters');

module.exports = {
...uploadLimiters,
Expand All @@ -39,5 +40,6 @@ module.exports = {
validateModel,
moderateText,
noIndex,
...importLimiters,
checkDomainAllowed,
};
52 changes: 52 additions & 0 deletions api/server/routes/convos.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
const multer = require('multer');
const express = require('express');
const { CacheKeys } = require('librechat-data-provider');
const { initializeClient } = require('~/server/services/Endpoints/assistants');
const { getConvosByPage, deleteConvos, getConvo, saveConvo } = require('~/models/Conversation');
const { IMPORT_CONVERSATION_JOB_NAME } = require('~/server/utils/import/jobDefinition');
const { storage, importFileFilter } = require('~/server/routes/files/multer');
const requireJwtAuth = require('~/server/middleware/requireJwtAuth');
const { createImportLimiters } = require('~/server/middleware');
const jobScheduler = require('~/server/utils/jobScheduler');
const getLogStores = require('~/cache/getLogStores');
const { sleep } = require('~/server/utils');
const { logger } = require('~/config');
Expand Down Expand Up @@ -99,4 +104,51 @@ router.post('/update', async (req, res) => {
}
});

const { importIpLimiter, importUserLimiter } = createImportLimiters();
const upload = multer({ storage: storage, fileFilter: importFileFilter });

/**
* Imports a conversation from a JSON file and saves it to the database.
* @route POST /import
* @param {Express.Multer.File} req.file - The JSON file to import.
* @returns {object} 201 - success response - application/json
*/
router.post(
'/import',
importIpLimiter,
importUserLimiter,
upload.single('file'),
async (req, res) => {
try {
const filepath = req.file.path;
const job = await jobScheduler.now(IMPORT_CONVERSATION_JOB_NAME, filepath, req.user.id);

res.status(201).json({ message: 'Import started', jobId: job.id });
} catch (error) {
logger.error('Error processing file', error);
res.status(500).send('Error processing file');
}
},
);

// Get the status of an import job for polling
router.get('/import/jobs/:jobId', async (req, res) => {
try {
const { jobId } = req.params;
const { userId, ...jobStatus } = await jobScheduler.getJobStatus(jobId);
if (!jobStatus) {
return res.status(404).json({ message: 'Job not found.' });
}

if (userId !== req.user.id) {
return res.status(403).json({ message: 'Unauthorized' });
}

res.json(jobStatus);
} catch (error) {
logger.error('Error getting job details', error);
res.status(500).send('Error getting job details');
}
});

module.exports = router;
2 changes: 1 addition & 1 deletion api/server/routes/files/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const express = require('express');
const createMulterInstance = require('./multer');
const { uaParser, checkBan, requireJwtAuth, createFileLimiters } = require('~/server/middleware');
const { createMulterInstance } = require('./multer');

const files = require('./files');
const images = require('./images');
Expand Down
12 changes: 11 additions & 1 deletion api/server/routes/files/multer.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ const storage = multer.diskStorage({
},
});

const importFileFilter = (req, file, cb) => {
if (file.mimetype === 'application/json') {
cb(null, true);
} else if (path.extname(file.originalname).toLowerCase() === '.json') {
cb(null, true);
} else {
cb(new Error('Only JSON files are allowed'), false);
}
};

const fileFilter = (req, file, cb) => {
if (!file) {
return cb(new Error('No file provided'), false);
Expand All @@ -42,4 +52,4 @@ const createMulterInstance = async () => {
});
};

module.exports = createMulterInstance;
module.exports = { createMulterInstance, storage, importFileFilter };
63 changes: 63 additions & 0 deletions api/server/services/AppService.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,69 @@ describe('AppService', () => {
expect(process.env.FILE_UPLOAD_USER_MAX).toEqual('initialUserMax');
expect(process.env.FILE_UPLOAD_USER_WINDOW).toEqual('initialUserWindow');
});

it('should not modify IMPORT environment variables without rate limits', async () => {
// Setup initial environment variables
process.env.IMPORT_IP_MAX = '10';
process.env.IMPORT_IP_WINDOW = '15';
process.env.IMPORT_USER_MAX = '5';
process.env.IMPORT_USER_WINDOW = '20';

const initialEnv = { ...process.env };

await AppService(app);

// Expect environment variables to remain unchanged
expect(process.env.IMPORT_IP_MAX).toEqual(initialEnv.IMPORT_IP_MAX);
expect(process.env.IMPORT_IP_WINDOW).toEqual(initialEnv.IMPORT_IP_WINDOW);
expect(process.env.IMPORT_USER_MAX).toEqual(initialEnv.IMPORT_USER_MAX);
expect(process.env.IMPORT_USER_WINDOW).toEqual(initialEnv.IMPORT_USER_WINDOW);
});

it('should correctly set IMPORT environment variables based on rate limits', async () => {
// Define and mock a custom configuration with rate limits
const importLimitsConfig = {
rateLimits: {
conversationsImport: {
ipMax: '150',
ipWindowInMinutes: '60',
userMax: '50',
userWindowInMinutes: '30',
},
},
};

require('./Config/loadCustomConfig').mockImplementationOnce(() =>
Promise.resolve(importLimitsConfig),
);

await AppService(app);

// Verify that process.env has been updated according to the rate limits config
expect(process.env.IMPORT_IP_MAX).toEqual('150');
expect(process.env.IMPORT_IP_WINDOW).toEqual('60');
expect(process.env.IMPORT_USER_MAX).toEqual('50');
expect(process.env.IMPORT_USER_WINDOW).toEqual('30');
});

it('should fallback to default IMPORT environment variables when rate limits are unspecified', async () => {
// Setup initial environment variables to non-default values
process.env.IMPORT_IP_MAX = 'initialMax';
process.env.IMPORT_IP_WINDOW = 'initialWindow';
process.env.IMPORT_USER_MAX = 'initialUserMax';
process.env.IMPORT_USER_WINDOW = 'initialUserWindow';

// Mock a custom configuration without specific rate limits
require('./Config/loadCustomConfig').mockImplementationOnce(() => Promise.resolve({}));

await AppService(app);

// Verify that process.env falls back to the initial values
expect(process.env.IMPORT_IP_MAX).toEqual('initialMax');
expect(process.env.IMPORT_IP_WINDOW).toEqual('initialWindow');
expect(process.env.IMPORT_USER_MAX).toEqual('initialUserMax');
expect(process.env.IMPORT_USER_WINDOW).toEqual('initialUserWindow');
});
});

describe('AppService updating app.locals and issuing warnings', () => {
Expand Down
Loading

0 comments on commit ab6fbe4

Please sign in to comment.