From 19090b9c94eb609ef82eeac91c0f0b5ffbc59133 Mon Sep 17 00:00:00 2001 From: Chuck1sn Date: Thu, 26 Jun 2025 18:03:00 +0800 Subject: [PATCH] add async --- .../java/com/zl/mjga/ApplicationService.java | 2 + .../zl/mjga/controller/LibraryController.java | 4 +- .../java/com/zl/mjga/service/RagService.java | 86 ++++++++----------- .../db/migration/V1_0_3__init_library.sql | 6 ++ 4 files changed, 47 insertions(+), 51 deletions(-) diff --git a/backend/src/main/java/com/zl/mjga/ApplicationService.java b/backend/src/main/java/com/zl/mjga/ApplicationService.java index 4c28dd7..7a97b28 100644 --- a/backend/src/main/java/com/zl/mjga/ApplicationService.java +++ b/backend/src/main/java/com/zl/mjga/ApplicationService.java @@ -2,7 +2,9 @@ package com.zl.mjga; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; +@EnableAsync @SpringBootApplication(scanBasePackages = {"com.zl.mjga", "org.jooq.generated"}) public class ApplicationService { diff --git a/backend/src/main/java/com/zl/mjga/controller/LibraryController.java b/backend/src/main/java/com/zl/mjga/controller/LibraryController.java index 5bcac8b..2dbc89a 100644 --- a/backend/src/main/java/com/zl/mjga/controller/LibraryController.java +++ b/backend/src/main/java/com/zl/mjga/controller/LibraryController.java @@ -61,7 +61,9 @@ public class LibraryController { @RequestPart("libraryId") Long libraryId, @RequestPart("file") MultipartFile multipartFile) throws Exception { String objectName = uploadService.uploadLibraryDoc(multipartFile); - ragService.ingestDocumentBy(libraryId, objectName, multipartFile.getOriginalFilename()); + Long libraryDocId = + ragService.createLibraryDocBy(libraryId, objectName, multipartFile.getOriginalFilename()); + ragService.embeddingAndCreateDocSegment(libraryDocId, objectName); return objectName; } } diff --git a/backend/src/main/java/com/zl/mjga/service/RagService.java b/backend/src/main/java/com/zl/mjga/service/RagService.java index 22eff95..fd0ea2b 100644 --- a/backend/src/main/java/com/zl/mjga/service/RagService.java +++ b/backend/src/main/java/com/zl/mjga/service/RagService.java @@ -31,12 +31,14 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.RandomStringUtils; import org.jooq.JSON; +import org.jooq.generated.mjga.enums.LibraryDocStatusEnum; import org.jooq.generated.mjga.tables.daos.LibraryDocSegmentDao; import org.jooq.generated.mjga.tables.pojos.LibraryDoc; import org.jooq.generated.mjga.tables.pojos.LibraryDocSegment; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Async; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; @Configuration @RequiredArgsConstructor @@ -79,62 +81,17 @@ public class RagService { libraryDocRepository.deleteById(docId); } - @Transactional(rollbackFor = Throwable.class) - public void ingestDocumentBy(Long libraryId, String objectName, String originalName) - throws Exception { - Document document = - amazonS3DocumentLoader.loadDocument( - minIoConfig.getDefaultBucket(), objectName, new ApacheTikaDocumentParser()); - ArrayList embeddingIds = new ArrayList<>(); - try { - Long libraryDocId = createLibraryDoc(objectName, originalName, document.metadata().toMap()); - DocumentByParagraphSplitter documentByParagraphSplitter = - new DocumentByParagraphSplitter(1000, 200); - documentByParagraphSplitter - .split(document) - .forEach( - textSegment -> { - Metadata metadata = textSegment.metadata(); - metadata.put("libraryId", libraryId); - Response embed = zhipuEmbeddingModel.embed(textSegment); - Integer tokenUsage = embed.tokenUsage().totalTokenCount(); - Embedding vector = embed.content(); - String embeddingId = zhiPuEmbeddingStore.add(vector, textSegment); - embeddingIds.add(embeddingId); - createLibraryDocSegment(textSegment, libraryDocId, tokenUsage, embeddingId); - }); - } catch (Exception e) { - log.error( - "文档采集失败。libraryId {} objectName {} originalName {}", - libraryId, - objectName, - originalName, - e); - if (CollectionUtils.isNotEmpty(embeddingIds)) { - zhiPuEmbeddingStore.removeAll(embeddingIds); - } - throw e; - } - } - - private void createLibraryDocSegment( - TextSegment textSegment, Long libraryDocId, Integer tokenUsage, String embeddingId) { - LibraryDocSegment libraryDocSegment = new LibraryDocSegment(); - libraryDocSegment.setDocId(libraryDocId); - libraryDocSegment.setContent(textSegment.text()); - libraryDocSegment.setTokenUsage(tokenUsage); - libraryDocSegment.setEmbeddingId(embeddingId); - libraryDocSegmentDao.insert(); - } - - private Long createLibraryDoc(String objectName, String originalName, Map meta) + public Long createLibraryDocBy(Long libraryId, String objectName, String originalName) throws JsonProcessingException { + String username = SecurityContextHolder.getContext().getAuthentication().getName(); String identify = String.format( "%d%s_%s", Instant.now().toEpochMilli(), RandomStringUtils.insecure().nextAlphabetic(6), originalName); + Map meta = new HashMap<>(); + meta.put("uploader", username); LibraryDoc libraryDoc = new LibraryDoc(); ObjectMapper objectMapper = new ObjectMapper(); String metaJson = objectMapper.writeValueAsString(meta); @@ -142,10 +99,39 @@ public class RagService { libraryDoc.setPath(objectName); libraryDoc.setName(originalName); libraryDoc.setIdentify(identify); + libraryDoc.setLibId(libraryId); + libraryDoc.setStatus(LibraryDocStatusEnum.INDEXING); + libraryDoc.setEnable(Boolean.TRUE); libraryDocRepository.insert(libraryDoc); return libraryDocRepository.fetchOneByIdentify(identify).getId(); } + @Async + public void embeddingAndCreateDocSegment(Long libraryDocId, String objectName) { + Document document = + amazonS3DocumentLoader.loadDocument( + minIoConfig.getDefaultBucket(), objectName, new ApacheTikaDocumentParser()); + List libraryDocSegments = new ArrayList<>(); + DocumentByParagraphSplitter documentByParagraphSplitter = + new DocumentByParagraphSplitter(1000, 200); + documentByParagraphSplitter + .split(document) + .forEach( + textSegment -> { + Response embed = zhipuEmbeddingModel.embed(textSegment); + Integer tokenUsage = embed.tokenUsage().totalTokenCount(); + Embedding vector = embed.content(); + String embeddingId = zhiPuEmbeddingStore.add(vector, textSegment); + LibraryDocSegment libraryDocSegment = new LibraryDocSegment(); + libraryDocSegment.setEmbeddingId(embeddingId); + libraryDocSegment.setContent(textSegment.text()); + libraryDocSegment.setTokenUsage(tokenUsage); + libraryDocSegment.setDocId(libraryDocId); + libraryDocSegments.add(libraryDocSegment); + }); + libraryDocSegmentDao.insert(libraryDocSegments); + } + public Map searchAction(String message) { Map result = new HashMap<>(); EmbeddingSearchRequest embeddingSearchRequest = diff --git a/backend/src/main/resources/db/migration/V1_0_3__init_library.sql b/backend/src/main/resources/db/migration/V1_0_3__init_library.sql index fb190cb..9a2ffd3 100644 --- a/backend/src/main/resources/db/migration/V1_0_3__init_library.sql +++ b/backend/src/main/resources/db/migration/V1_0_3__init_library.sql @@ -5,6 +5,11 @@ CREATE TABLE mjga.library ( create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP ); +CREATE TYPE mjga.library_doc_status_enum AS ENUM ( + 'SUCCESS', + 'INDEXING' +); + CREATE TABLE mjga.library_doc ( id BIGSERIAL PRIMARY KEY, lib_id BIGINT NOT NULL, @@ -13,6 +18,7 @@ CREATE TABLE mjga.library_doc ( path VARCHAR NOT NULL, meta JSON NOT NULL, enable BOOLEAN NOT NULL DEFAULT true, + status mjga.library_doc_status_enum NOT NULL, create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMPTZ, FOREIGN KEY (lib_id) REFERENCES mjga.library (id) ON DELETE CASCADE