feat: KM-RAG Polyglot Ingestion Pipeline Migration (#46)

Resolves the KM-RAG Polyglot Persistence and Background Ingestion Pipeline Migration task.

### Key Changes
1. **Infrastructure Migration**: Integrated Qdrant (for vector embeddings) and Neo4j (for concept graphs), reducing reliance on PostgreSQL pgvector storage.
2. **Concurrent Background Job**: Implemented a robust Hangfire `EbookIngestionJob` utilizing Polly exponential retries for transient 429 rate limits, executing three core ingestion tasks concurrently via `Task.WhenAll`.
3. **Data Layer**: Standardized database schemas and entities; retained `Pgvector.EntityFrameworkCore` for migration compilation compatibility.
4. **Wasm Client & Tests**: Implemented client support for semantic search and refactored related tests in `QueryTests.cs` to mock `IKnowledgeService`.

### Verification Status
- **Build**: Successfully compiles with `dotnet build NexusReader.slnx --no-restore` (0 errors).
- **Tests**: All 5 unit tests pass cleanly with `dotnet test NexusReader.slnx --no-restore`.

**Resolve** #47

---------

Co-authored-by: Marek Jasiński <jasins.marek@gmail.com>
Reviewed-on: #46
Reviewed-by: Marek Jaisński <jasins.marek@gmail.com>
Co-authored-by: Antigravity <antigravity@google.com>
Co-committed-by: Antigravity <antigravity@google.com>
This commit was merged in pull request #46.
This commit is contained in:
2026-05-20 18:15:28 +00:00
committed by Marek Jaisński
parent 711822f5de
commit 23acaeb705
15 changed files with 348 additions and 287 deletions
@@ -14,8 +14,8 @@ using Polly;
using Polly.Registry;
using Microsoft.Extensions.Options;
using NexusReader.Infrastructure.Configuration;
using Pgvector;
using Pgvector.EntityFrameworkCore;
using Qdrant.Client;
using Neo4j.Driver;
namespace NexusReader.Infrastructure.Services;
@@ -30,6 +30,8 @@ public class KnowledgeService : IKnowledgeService
private readonly AiSettings _settings;
private readonly Tokenizer _tokenizer;
private readonly ILogger<KnowledgeService> _logger;
private readonly QdrantClient _qdrantClient;
private readonly IDriver _neo4jDriver;
private const string PromptVersion = "1.3";
private static readonly ConcurrentDictionary<string, Lazy<Task<Result<KnowledgePacket>>>> _activeRequests = new();
@@ -39,7 +41,9 @@ public class KnowledgeService : IKnowledgeService
IDbContextFactory<AppDbContext> dbContextFactory,
ResiliencePipelineProvider<string> pipelineProvider,
IOptions<AiSettings> settings,
ILogger<KnowledgeService> logger)
ILogger<KnowledgeService> logger,
QdrantClient qdrantClient,
IDriver neo4jDriver)
{
_chatClient = chatClient;
_embeddingGenerator = embeddingGenerator;
@@ -47,6 +51,8 @@ public class KnowledgeService : IKnowledgeService
_retryPipeline = pipelineProvider.GetPipeline("ai-retry");
_settings = settings.Value;
_logger = logger;
_qdrantClient = qdrantClient;
_neo4jDriver = neo4jDriver;
// Use Tiktoken (cl100k_base) which is a standard for modern LLMs and provides
// a very reliable estimation for token usage in Gemini-based workloads.
_tokenizer = TiktokenTokenizer.CreateForModel("gpt-4");
@@ -169,19 +175,6 @@ public class KnowledgeService : IKnowledgeService
var knowledgePacket = JsonSerializer.Deserialize<KnowledgePacket>(jsonResponse, JsonOptions);
if (knowledgePacket == null) return Result.Fail("Failed to deserialize AI response.");
// 3. Generate Embedding if not present
float[]? vector = null;
try
{
var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(new[] { normalizedText }, new EmbeddingGenerationOptions { Dimensions = 1536 }, cancellationToken: ct));
vector = embeddingResponse.First().Vector.ToArray();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[KnowledgeService] Embedding generation failed; proceeding without vector.");
}
// 4. Save to Cache
var cached = await dbContext.SemanticKnowledgeCache
.FirstOrDefaultAsync(c => c.ContentHash == hash && c.TenantId == tenantId);
@@ -194,7 +187,6 @@ public class KnowledgeService : IKnowledgeService
ModelId = _settings.Model,
PromptVersion = PromptVersion,
TenantId = tenantId,
Vector = vector != null ? new Vector(vector) : null,
CreatedAt = DateTime.UtcNow
};
@@ -203,7 +195,6 @@ public class KnowledgeService : IKnowledgeService
{
cached.JsonData = jsonResponse;
cached.OriginalText = normalizedText;
cached.Vector = vector != null ? new Vector(vector) : null;
cached.CreatedAt = DateTime.UtcNow;
}
@@ -267,13 +258,7 @@ public class KnowledgeService : IKnowledgeService
unit.MetadataJson = JsonSerializer.Serialize(unitDto.Metadata);
try
{
var emb = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(new[] { unit.Content }, new EmbeddingGenerationOptions { Dimensions = 768 }, cancellationToken: ct), cancellationToken);
unit.Vector = new Vector(emb.First().Vector.ToArray());
}
catch { /* Ignore embedding errors for now */ }
// Embeddings and vector storage are handled via Qdrant in the new pipeline.
processedUnitIds.Add(unit.Id);
}
@@ -342,21 +327,54 @@ public class KnowledgeService : IKnowledgeService
public async Task<Result<List<RelevantContext>>> GetRelevantContextAsync(string query, string tenantId, CancellationToken cancellationToken = default)
{
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
try
{
var queryEmbedding = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(new[] { query }, cancellationToken: ct), cancellationToken);
var queryVector = new Vector(queryEmbedding.First().Vector.ToArray());
var queryVector = queryEmbedding.First().Vector.ToArray();
var relevantUnits = await dbContext.KnowledgeUnits
.Where(u => u.TenantId == tenantId)
.OrderBy(u => u.Vector!.L2Distance(queryVector))
.Take(5)
.Select(u => new RelevantContext { Text = u.Content, Confidence = 1.0 })
.ToListAsync(cancellationToken);
var filter = new Qdrant.Client.Grpc.Filter();
filter.Should.Add(new Qdrant.Client.Grpc.Condition
{
Field = new Qdrant.Client.Grpc.FieldCondition
{
Key = "tenantId",
Match = new Qdrant.Client.Grpc.Match { Text = tenantId }
}
});
filter.Should.Add(new Qdrant.Client.Grpc.Condition
{
Field = new Qdrant.Client.Grpc.FieldCondition
{
Key = "tenantId",
Match = new Qdrant.Client.Grpc.Match { Text = "global" }
}
});
return Result.Ok(relevantUnits);
List<Qdrant.Client.Grpc.ScoredPoint> searchResult;
try
{
var response = await _qdrantClient.SearchAsync(
collectionName: "knowledge_units",
vector: queryVector,
filter: filter,
limit: 5,
cancellationToken: cancellationToken
);
searchResult = response.ToList();
}
catch (Exception)
{
searchResult = new List<Qdrant.Client.Grpc.ScoredPoint>();
}
var contexts = searchResult.Select(point => new RelevantContext
{
Text = point.Payload.TryGetValue("content", out var cv) ? cv.StringValue : string.Empty,
Confidence = point.Score
}).ToList();
return Result.Ok(contexts);
}
catch (Exception ex)
{
@@ -364,6 +382,170 @@ public class KnowledgeService : IKnowledgeService
}
}
public async Task<Result<List<SemanticSearchResultDto>>> SearchLibrarySemanticallyAsync(string queryText, string tenantId, int limit, CancellationToken cancellationToken = default)
{
try
{
// 1. Generate 768-dimensional embedding
var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(
new[] { queryText },
new EmbeddingGenerationOptions { Dimensions = 768 },
cancellationToken: ct), cancellationToken);
var queryVector = embeddingResponse.First().Vector.ToArray();
// 2. Query Qdrant
var filter = new Qdrant.Client.Grpc.Filter();
filter.Should.Add(new Qdrant.Client.Grpc.Condition
{
Field = new Qdrant.Client.Grpc.FieldCondition
{
Key = "tenantId",
Match = new Qdrant.Client.Grpc.Match { Text = tenantId }
}
});
filter.Should.Add(new Qdrant.Client.Grpc.Condition
{
Field = new Qdrant.Client.Grpc.FieldCondition
{
Key = "tenantId",
Match = new Qdrant.Client.Grpc.Match { Text = "global" }
}
});
List<Qdrant.Client.Grpc.ScoredPoint> searchResult;
try
{
var response = await _qdrantClient.SearchAsync(
collectionName: "knowledge_units",
vector: queryVector,
filter: filter,
limit: (ulong)limit,
cancellationToken: cancellationToken
);
searchResult = response.ToList();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[KnowledgeService] Failed to search in Qdrant; collection might not exist yet.");
searchResult = new List<Qdrant.Client.Grpc.ScoredPoint>();
}
if (!searchResult.Any())
{
return Result.Ok(new List<SemanticSearchResultDto>());
}
// 3. Graph Expansion via Neo4j
var candidateIds = searchResult.Select(r => r.Id.ToString()).ToList();
var definitions = new Dictionary<string, List<string>>();
if (candidateIds.Any())
{
try
{
await using var session = _neo4jDriver.AsyncSession();
var cypher = @"
MATCH (source:KnowledgeUnit)-[r:DEFINES]->(target:KnowledgeUnit)
WHERE source.id IN $candidateIds
RETURN source.id AS sourceId, target.content AS targetContent";
var neoResult = await session.ExecuteReadAsync(async tx =>
{
var cursor = await tx.RunAsync(cypher, new { candidateIds });
return await cursor.ToListAsync();
});
foreach (var record in neoResult)
{
var sourceId = record["sourceId"].As<string>();
var targetContent = record["targetContent"].As<string>();
if (!definitions.ContainsKey(sourceId))
{
definitions[sourceId] = new List<string>();
}
definitions[sourceId].Add(targetContent);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[KnowledgeService] Neo4j graph expansion query failed.");
}
}
// 4. Retrieve Ebook Titles from PostgreSQL
var ebookIds = searchResult
.Where(r => r.Payload.TryGetValue("ebookId", out var ev) && Guid.TryParse(ev.StringValue, out _))
.Select(r => Guid.Parse(r.Payload["ebookId"].StringValue))
.Distinct()
.ToList();
var ebookTitles = new Dictionary<Guid, string>();
if (ebookIds.Any())
{
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
ebookTitles = await dbContext.Ebooks
.Where(e => ebookIds.Contains(e.Id))
.ToDictionaryAsync(e => e.Id, e => e.Title, cancellationToken);
}
// 5. Map results to DTOs
var dtos = searchResult.Select(point =>
{
var content = point.Payload.TryGetValue("content", out var cv) ? cv.StringValue : string.Empty;
var type = point.Payload.TryGetValue("type", out var tv) ? tv.StringValue : string.Empty;
var ebookIdStr = point.Payload.TryGetValue("ebookId", out var ev) ? ev.StringValue : null;
Guid? ebookId = null;
if (Guid.TryParse(ebookIdStr, out var parsedId))
{
ebookId = parsedId;
}
string? bookTitle = null;
if (ebookId.HasValue)
{
ebookTitles.TryGetValue(ebookId.Value, out bookTitle);
}
Dictionary<string, object>? metadata = null;
if (point.Payload.TryGetValue("metadataJson", out var metaVal) && !string.IsNullOrEmpty(metaVal.StringValue))
{
try
{
metadata = JsonSerializer.Deserialize<Dictionary<string, object>>(metaVal.StringValue);
}
catch {}
}
var dto = new SemanticSearchResultDto
{
ContentHash = point.Id.ToString(),
Snippet = content,
UnitType = type,
RelevanceScore = point.Score,
SourceBookTitle = bookTitle,
Metadata = metadata
};
var pointIdStr = point.Id.ToString();
if (definitions.TryGetValue(pointIdStr, out var pointDefs) && pointDefs.Any())
{
dto.Snippet = $"[Context: {string.Join("; ", pointDefs)}]\n{dto.Snippet}";
}
return dto;
}).ToList();
return Result.Ok(dtos);
}
catch (Exception ex)
{
return Result.Fail(new Error("Failed to search library semantically").CausedBy(ex));
}
}
public async Task<Result> ClearCacheAsync(CancellationToken cancellationToken = default)
{
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);