feat(rag): implement Qdrant dynamic collection creation, deterministic ID matching, and batch vector ingestion

This commit is contained in:
2026-05-23 20:17:41 +02:00
parent 5740d9126a
commit 97c1c309b1
@@ -15,6 +15,7 @@ using Polly.Registry;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using NexusReader.Infrastructure.Configuration; using NexusReader.Infrastructure.Configuration;
using Qdrant.Client; using Qdrant.Client;
using Qdrant.Client.Grpc;
using Neo4j.Driver; using Neo4j.Driver;
namespace NexusReader.Infrastructure.Services; namespace NexusReader.Infrastructure.Services;
@@ -285,6 +286,98 @@ public class KnowledgeService : IKnowledgeService
_logger.LogWarning("[KnowledgeService] Skipping invalid link {Source} -> {Target}: one or both units are missing.", linkDto.Source, linkDto.Target); _logger.LogWarning("[KnowledgeService] Skipping invalid link {Source} -> {Target}: one or both units are missing.", linkDto.Source, linkDto.Target);
} }
} }
// Generate and upsert vectors to Qdrant in batch
var unitsToEmbed = packet.Units
.Where(u => !string.IsNullOrEmpty(u.Content))
.ToList();
if (unitsToEmbed.Any())
{
try
{
var contents = unitsToEmbed.Select(u => u.Content).ToList();
var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(
contents,
new EmbeddingGenerationOptions { Dimensions = 768 },
cancellationToken: ct), cancellationToken);
var embeddings = embeddingResponse.ToList();
var points = new List<PointStruct>();
for (int i = 0; i < unitsToEmbed.Count; i++)
{
var unitDto = unitsToEmbed[i];
var vector = embeddings[i].Vector.ToArray();
var point = new PointStruct
{
Id = GetDeterministicGuid(unitDto.Id),
Vectors = vector,
Payload =
{
["content"] = unitDto.Content,
["type"] = unitDto.Type ?? string.Empty,
["tenantId"] = tenantId,
["ebookId"] = ebookId?.ToString() ?? string.Empty,
["metadataJson"] = JsonSerializer.Serialize(unitDto.Metadata)
}
};
points.Add(point);
}
if (points.Any())
{
await EnsureCollectionExistsAsync("knowledge_units", cancellationToken);
await _qdrantClient.UpsertAsync("knowledge_units", points, cancellationToken: cancellationToken);
_logger.LogInformation("[KnowledgeService] Successfully upserted {Count} points to Qdrant collection 'knowledge_units'.", points.Count);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[KnowledgeService] Failed to generate and upsert embeddings for knowledge units to Qdrant.");
}
}
}
private async Task EnsureCollectionExistsAsync(string collectionName, CancellationToken cancellationToken = default)
{
try
{
var exists = await _qdrantClient.CollectionExistsAsync(collectionName, cancellationToken);
if (!exists)
{
_logger.LogInformation("[KnowledgeService] Creating Qdrant collection '{CollectionName}'...", collectionName);
await _qdrantClient.CreateCollectionAsync(
collectionName: collectionName,
vectorsConfig: new VectorParams
{
Size = 768,
Distance = Distance.Cosine
},
cancellationToken: cancellationToken
);
_logger.LogInformation("[KnowledgeService] Qdrant collection '{CollectionName}' created successfully.", collectionName);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[KnowledgeService] Error ensuring Qdrant collection '{CollectionName}' exists.", collectionName);
}
}
private static Guid GetDeterministicGuid(string input)
{
if (Guid.TryParse(input, out var guid))
{
return guid;
}
using var md5 = System.Security.Cryptography.MD5.Create();
byte[] hash = md5.ComputeHash(System.Text.Encoding.UTF8.GetBytes(input));
return new Guid(hash);
} }
public async Task<Result<GroundednessResult>> VerifyGroundednessAsync(string answer, string context, string tenantId, CancellationToken cancellationToken = default) public async Task<Result<GroundednessResult>> VerifyGroundednessAsync(string answer, string context, string tenantId, CancellationToken cancellationToken = default)
@@ -354,6 +447,7 @@ public class KnowledgeService : IKnowledgeService
List<Qdrant.Client.Grpc.ScoredPoint> searchResult; List<Qdrant.Client.Grpc.ScoredPoint> searchResult;
try try
{ {
await EnsureCollectionExistsAsync("knowledge_units", cancellationToken);
var response = await _qdrantClient.SearchAsync( var response = await _qdrantClient.SearchAsync(
collectionName: "knowledge_units", collectionName: "knowledge_units",
vector: queryVector, vector: queryVector,
@@ -417,6 +511,7 @@ public class KnowledgeService : IKnowledgeService
List<Qdrant.Client.Grpc.ScoredPoint> searchResult; List<Qdrant.Client.Grpc.ScoredPoint> searchResult;
try try
{ {
await EnsureCollectionExistsAsync("knowledge_units", cancellationToken);
var response = await _qdrantClient.SearchAsync( var response = await _qdrantClient.SearchAsync(
collectionName: "knowledge_units", collectionName: "knowledge_units",
vector: queryVector, vector: queryVector,
@@ -602,6 +697,7 @@ public class KnowledgeService : IKnowledgeService
List<Qdrant.Client.Grpc.ScoredPoint> searchResult; List<Qdrant.Client.Grpc.ScoredPoint> searchResult;
try try
{ {
await EnsureCollectionExistsAsync("knowledge_units", cancellationToken);
var response = await _qdrantClient.SearchAsync( var response = await _qdrantClient.SearchAsync(
collectionName: "knowledge_units", collectionName: "knowledge_units",
vector: queryVector, vector: queryVector,
@@ -790,6 +886,16 @@ Strict Grounding Rules:
await dbContext.SemanticKnowledgeCache.ExecuteDeleteAsync(cancellationToken); await dbContext.SemanticKnowledgeCache.ExecuteDeleteAsync(cancellationToken);
await dbContext.KnowledgeUnits.ExecuteDeleteAsync(cancellationToken); await dbContext.KnowledgeUnits.ExecuteDeleteAsync(cancellationToken);
await dbContext.KnowledgeUnitLinks.ExecuteDeleteAsync(cancellationToken); await dbContext.KnowledgeUnitLinks.ExecuteDeleteAsync(cancellationToken);
try
{
await _qdrantClient.DeleteCollectionAsync("knowledge_units", cancellationToken: cancellationToken);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[KnowledgeService] Failed to drop Qdrant collection 'knowledge_units' during cache clear.");
}
return Result.Ok(); return Result.Ok();
} }
catch (Exception ex) catch (Exception ex)