add async

This commit is contained in:
Chuck1sn
2025-06-26 18:03:00 +08:00
parent cbfbd6c5dd
commit 19090b9c94
4 changed files with 47 additions and 51 deletions

View File

@@ -2,7 +2,9 @@ package com.zl.mjga;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync
@SpringBootApplication(scanBasePackages = {"com.zl.mjga", "org.jooq.generated"})
public class ApplicationService {

View File

@@ -61,7 +61,9 @@ public class LibraryController {
@RequestPart("libraryId") Long libraryId, @RequestPart("file") MultipartFile multipartFile)
throws Exception {
String objectName = uploadService.uploadLibraryDoc(multipartFile);
ragService.ingestDocumentBy(libraryId, objectName, multipartFile.getOriginalFilename());
Long libraryDocId =
ragService.createLibraryDocBy(libraryId, objectName, multipartFile.getOriginalFilename());
ragService.embeddingAndCreateDocSegment(libraryDocId, objectName);
return objectName;
}
}

View File

@@ -31,12 +31,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSON;
import org.jooq.generated.mjga.enums.LibraryDocStatusEnum;
import org.jooq.generated.mjga.tables.daos.LibraryDocSegmentDao;
import org.jooq.generated.mjga.tables.pojos.LibraryDoc;
import org.jooq.generated.mjga.tables.pojos.LibraryDocSegment;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Configuration
@RequiredArgsConstructor
@@ -79,62 +81,17 @@ public class RagService {
libraryDocRepository.deleteById(docId);
}
@Transactional(rollbackFor = Throwable.class)
public void ingestDocumentBy(Long libraryId, String objectName, String originalName)
throws Exception {
Document document =
amazonS3DocumentLoader.loadDocument(
minIoConfig.getDefaultBucket(), objectName, new ApacheTikaDocumentParser());
ArrayList<String> embeddingIds = new ArrayList<>();
try {
Long libraryDocId = createLibraryDoc(objectName, originalName, document.metadata().toMap());
DocumentByParagraphSplitter documentByParagraphSplitter =
new DocumentByParagraphSplitter(1000, 200);
documentByParagraphSplitter
.split(document)
.forEach(
textSegment -> {
Metadata metadata = textSegment.metadata();
metadata.put("libraryId", libraryId);
Response<Embedding> embed = zhipuEmbeddingModel.embed(textSegment);
Integer tokenUsage = embed.tokenUsage().totalTokenCount();
Embedding vector = embed.content();
String embeddingId = zhiPuEmbeddingStore.add(vector, textSegment);
embeddingIds.add(embeddingId);
createLibraryDocSegment(textSegment, libraryDocId, tokenUsage, embeddingId);
});
} catch (Exception e) {
log.error(
"文档采集失败。libraryId {} objectName {} originalName {}",
libraryId,
objectName,
originalName,
e);
if (CollectionUtils.isNotEmpty(embeddingIds)) {
zhiPuEmbeddingStore.removeAll(embeddingIds);
}
throw e;
}
}
private void createLibraryDocSegment(
TextSegment textSegment, Long libraryDocId, Integer tokenUsage, String embeddingId) {
LibraryDocSegment libraryDocSegment = new LibraryDocSegment();
libraryDocSegment.setDocId(libraryDocId);
libraryDocSegment.setContent(textSegment.text());
libraryDocSegment.setTokenUsage(tokenUsage);
libraryDocSegment.setEmbeddingId(embeddingId);
libraryDocSegmentDao.insert();
}
private Long createLibraryDoc(String objectName, String originalName, Map meta)
public Long createLibraryDocBy(Long libraryId, String objectName, String originalName)
throws JsonProcessingException {
String username = SecurityContextHolder.getContext().getAuthentication().getName();
String identify =
String.format(
"%d%s_%s",
Instant.now().toEpochMilli(),
RandomStringUtils.insecure().nextAlphabetic(6),
originalName);
Map<String, String> meta = new HashMap<>();
meta.put("uploader", username);
LibraryDoc libraryDoc = new LibraryDoc();
ObjectMapper objectMapper = new ObjectMapper();
String metaJson = objectMapper.writeValueAsString(meta);
@@ -142,10 +99,39 @@ public class RagService {
libraryDoc.setPath(objectName);
libraryDoc.setName(originalName);
libraryDoc.setIdentify(identify);
libraryDoc.setLibId(libraryId);
libraryDoc.setStatus(LibraryDocStatusEnum.INDEXING);
libraryDoc.setEnable(Boolean.TRUE);
libraryDocRepository.insert(libraryDoc);
return libraryDocRepository.fetchOneByIdentify(identify).getId();
}
@Async
public void embeddingAndCreateDocSegment(Long libraryDocId, String objectName) {
Document document =
amazonS3DocumentLoader.loadDocument(
minIoConfig.getDefaultBucket(), objectName, new ApacheTikaDocumentParser());
List<LibraryDocSegment> libraryDocSegments = new ArrayList<>();
DocumentByParagraphSplitter documentByParagraphSplitter =
new DocumentByParagraphSplitter(1000, 200);
documentByParagraphSplitter
.split(document)
.forEach(
textSegment -> {
Response<Embedding> embed = zhipuEmbeddingModel.embed(textSegment);
Integer tokenUsage = embed.tokenUsage().totalTokenCount();
Embedding vector = embed.content();
String embeddingId = zhiPuEmbeddingStore.add(vector, textSegment);
LibraryDocSegment libraryDocSegment = new LibraryDocSegment();
libraryDocSegment.setEmbeddingId(embeddingId);
libraryDocSegment.setContent(textSegment.text());
libraryDocSegment.setTokenUsage(tokenUsage);
libraryDocSegment.setDocId(libraryDocId);
libraryDocSegments.add(libraryDocSegment);
});
libraryDocSegmentDao.insert(libraryDocSegments);
}
public Map<String, String> searchAction(String message) {
Map<String, String> result = new HashMap<>();
EmbeddingSearchRequest embeddingSearchRequest =

View File

@@ -5,6 +5,11 @@ CREATE TABLE mjga.library (
create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TYPE mjga.library_doc_status_enum AS ENUM (
'SUCCESS',
'INDEXING'
);
CREATE TABLE mjga.library_doc (
id BIGSERIAL PRIMARY KEY,
lib_id BIGINT NOT NULL,
@@ -13,6 +18,7 @@ CREATE TABLE mjga.library_doc (
path VARCHAR NOT NULL,
meta JSON NOT NULL,
enable BOOLEAN NOT NULL DEFAULT true,
status mjga.library_doc_status_enum NOT NULL,
create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMPTZ,
FOREIGN KEY (lib_id) REFERENCES mjga.library (id) ON DELETE CASCADE