diff --git a/src/NexusReader.Infrastructure/Services/KnowledgeService.cs b/src/NexusReader.Infrastructure/Services/KnowledgeService.cs index 9379aa8..5868f7b 100644 --- a/src/NexusReader.Infrastructure/Services/KnowledgeService.cs +++ b/src/NexusReader.Infrastructure/Services/KnowledgeService.cs @@ -15,6 +15,7 @@ using Polly.Registry; using Microsoft.Extensions.Options; using NexusReader.Infrastructure.Configuration; using Qdrant.Client; +using Qdrant.Client.Grpc; using Neo4j.Driver; namespace NexusReader.Infrastructure.Services; @@ -285,6 +286,98 @@ public class KnowledgeService : IKnowledgeService _logger.LogWarning("[KnowledgeService] Skipping invalid link {Source} -> {Target}: one or both units are missing.", linkDto.Source, linkDto.Target); } } + + // Generate and upsert vectors to Qdrant in batch + var unitsToEmbed = packet.Units + .Where(u => !string.IsNullOrEmpty(u.Content)) + .ToList(); + + if (unitsToEmbed.Any()) + { + try + { + var contents = unitsToEmbed.Select(u => u.Content).ToList(); + + var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct => + await _embeddingGenerator.GenerateAsync( + contents, + new EmbeddingGenerationOptions { Dimensions = 768 }, + cancellationToken: ct), cancellationToken); + + var embeddings = embeddingResponse.ToList(); + var points = new List(); + + for (int i = 0; i < unitsToEmbed.Count; i++) + { + var unitDto = unitsToEmbed[i]; + var vector = embeddings[i].Vector.ToArray(); + + var point = new PointStruct + { + Id = GetDeterministicGuid(unitDto.Id), + Vectors = vector, + Payload = + { + ["content"] = unitDto.Content, + ["type"] = unitDto.Type ?? string.Empty, + ["tenantId"] = tenantId, + ["ebookId"] = ebookId?.ToString() ?? string.Empty, + ["metadataJson"] = JsonSerializer.Serialize(unitDto.Metadata) + } + }; + points.Add(point); + } + + if (points.Any()) + { + await EnsureCollectionExistsAsync("knowledge_units", cancellationToken); + await _qdrantClient.UpsertAsync("knowledge_units", points, cancellationToken: cancellationToken); + _logger.LogInformation("[KnowledgeService] Successfully upserted {Count} points to Qdrant collection 'knowledge_units'.", points.Count); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "[KnowledgeService] Failed to generate and upsert embeddings for knowledge units to Qdrant."); + } + } + } + + private async Task EnsureCollectionExistsAsync(string collectionName, CancellationToken cancellationToken = default) + { + try + { + var exists = await _qdrantClient.CollectionExistsAsync(collectionName, cancellationToken); + if (!exists) + { + _logger.LogInformation("[KnowledgeService] Creating Qdrant collection '{CollectionName}'...", collectionName); + await _qdrantClient.CreateCollectionAsync( + collectionName: collectionName, + vectorsConfig: new VectorParams + { + Size = 768, + Distance = Distance.Cosine + }, + cancellationToken: cancellationToken + ); + _logger.LogInformation("[KnowledgeService] Qdrant collection '{CollectionName}' created successfully.", collectionName); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "[KnowledgeService] Error ensuring Qdrant collection '{CollectionName}' exists.", collectionName); + } + } + + private static Guid GetDeterministicGuid(string input) + { + if (Guid.TryParse(input, out var guid)) + { + return guid; + } + + using var md5 = System.Security.Cryptography.MD5.Create(); + byte[] hash = md5.ComputeHash(System.Text.Encoding.UTF8.GetBytes(input)); + return new Guid(hash); } public async Task> VerifyGroundednessAsync(string answer, string context, string tenantId, CancellationToken cancellationToken = default) @@ -354,6 +447,7 @@ public class KnowledgeService : IKnowledgeService List searchResult; try { + await EnsureCollectionExistsAsync("knowledge_units", cancellationToken); var response = await _qdrantClient.SearchAsync( collectionName: "knowledge_units", vector: queryVector, @@ -417,6 +511,7 @@ public class KnowledgeService : IKnowledgeService List searchResult; try { + await EnsureCollectionExistsAsync("knowledge_units", cancellationToken); var response = await _qdrantClient.SearchAsync( collectionName: "knowledge_units", vector: queryVector, @@ -602,6 +697,7 @@ public class KnowledgeService : IKnowledgeService List searchResult; try { + await EnsureCollectionExistsAsync("knowledge_units", cancellationToken); var response = await _qdrantClient.SearchAsync( collectionName: "knowledge_units", vector: queryVector, @@ -790,6 +886,16 @@ Strict Grounding Rules: await dbContext.SemanticKnowledgeCache.ExecuteDeleteAsync(cancellationToken); await dbContext.KnowledgeUnits.ExecuteDeleteAsync(cancellationToken); await dbContext.KnowledgeUnitLinks.ExecuteDeleteAsync(cancellationToken); + + try + { + await _qdrantClient.DeleteCollectionAsync("knowledge_units", cancellationToken: cancellationToken); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "[KnowledgeService] Failed to drop Qdrant collection 'knowledge_units' during cache clear."); + } + return Result.Ok(); } catch (Exception ex)