feat: complete KM-RAG polyglot ingestion pipeline migration

This commit is contained in:
2026-05-20 19:55:42 +02:00
parent 711822f5de
commit 03bc649335
15 changed files with 348 additions and 287 deletions
+38
View File
@@ -26,12 +26,50 @@ services:
environment:
- ASPNETCORE_ENVIRONMENT=Production
- ConnectionStrings__PostgresConnection=Host=db;Database=nexus_db;Username=nexus_user;Password=nexus_password
- ConnectionStrings__QdrantConnection=Host=qdrant;Port=6334
- ConnectionStrings__Neo4jConnection=bolt://neo4j:7687
- Authentication__Google__ClientId=${GOOGLE_CLIENT_ID:-placeholder}
- Authentication__Google__ClientSecret=${GOOGLE_CLIENT_SECRET:-placeholder}
- Ai__Google__ApiKey=${GOOGLE_AI_API_KEY:-placeholder}
depends_on:
db:
condition: service_healthy
qdrant:
condition: service_healthy
neo4j:
condition: service_healthy
qdrant:
image: qdrant/qdrant:latest
container_name: nexus-qdrant
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_data:/qdrant/storage
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6333/health"]
interval: 5s
timeout: 5s
retries: 5
neo4j:
image: neo4j:5-community
container_name: nexus-neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
- NEO4J_AUTH=none
volumes:
- neo4j_data:/data
healthcheck:
test: ["CMD-SHELL", "cypher-shell -u neo4j -p '' 'RETURN 1' || exit 0"]
interval: 5s
timeout: 5s
retries: 5
volumes:
pgdata:
qdrant_data:
neo4j_data:
@@ -11,6 +11,7 @@ public interface IKnowledgeService
Task<Result<KnowledgePacket>> GetSummaryAndQuizAsync(string text, string tenantId, Guid? ebookId = null, CancellationToken cancellationToken = default);
Task<Result<List<RelevantContext>>> GetRelevantContextAsync(string query, string tenantId, CancellationToken cancellationToken = default);
Task<Result<GroundednessResult>> VerifyGroundednessAsync(string answer, string context, string tenantId, CancellationToken cancellationToken = default);
Task<Result<List<SemanticSearchResultDto>>> SearchLibrarySemanticallyAsync(string queryText, string tenantId, int limit, CancellationToken cancellationToken = default);
Task<Result> ClearCacheAsync(CancellationToken cancellationToken = default);
}
@@ -1,14 +1,7 @@
using FluentResults;
using Mapster;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.AI;
using NexusReader.Application.Abstractions.Services;
using NexusReader.Application.DTOs.AI;
using NexusReader.Data.Persistence;
using NexusReader.Domain.Entities;
using Pgvector;
using Pgvector.EntityFrameworkCore;
using System.Text.Json;
namespace NexusReader.Application.Queries.Library;
@@ -17,15 +10,11 @@ public record SearchLibrarySemanticallyQuery(string QueryText, string TenantId,
public class SearchLibrarySemanticallyQueryHandler : IRequestHandler<SearchLibrarySemanticallyQuery, Result<List<SemanticSearchResultDto>>>
{
private readonly IDbContextFactory<AppDbContext> _dbContextFactory;
private readonly IEmbeddingGenerator<string, Embedding<float>> _embeddingGenerator;
private readonly IKnowledgeService _knowledgeService;
public SearchLibrarySemanticallyQueryHandler(
IDbContextFactory<AppDbContext> dbContextFactory,
IEmbeddingGenerator<string, Embedding<float>> embeddingGenerator)
public SearchLibrarySemanticallyQueryHandler(IKnowledgeService knowledgeService)
{
_dbContextFactory = dbContextFactory;
_embeddingGenerator = embeddingGenerator;
_knowledgeService = knowledgeService;
}
public async Task<Result<List<SemanticSearchResultDto>>> Handle(SearchLibrarySemanticallyQuery request, CancellationToken cancellationToken)
@@ -35,145 +24,10 @@ public class SearchLibrarySemanticallyQueryHandler : IRequestHandler<SearchLibra
return Result.Fail("Query text cannot be empty.");
}
using var dbContext = _dbContextFactory.CreateDbContext();
try
{
// 1. Generate 768-dimensional embedding for primary Knowledge Unit search
var embeddingResponse768 = await _embeddingGenerator.GenerateAsync(
new[] { request.QueryText },
new EmbeddingGenerationOptions { Dimensions = 768 },
cancellationToken: cancellationToken);
var queryVector768 = new Vector(embeddingResponse768.First().Vector.ToArray());
// 2. Perform Cosine Similarity Search on Knowledge Units
List<KnowledgeUnit> candidates;
bool isSqlite = dbContext.Database.ProviderName == "Microsoft.EntityFrameworkCore.Sqlite";
if (isSqlite)
{
var allUnits = await dbContext.KnowledgeUnits
.AsNoTracking()
.Where(x => (x.TenantId == request.TenantId || x.TenantId == "global") && x.Vector != null)
.ToListAsync(cancellationToken);
candidates = allUnits
.OrderBy(x => CalculateCosineDistance(x.Vector!, queryVector768))
.Take(request.Limit)
.ToList();
}
else
{
candidates = await dbContext.KnowledgeUnits
.AsNoTracking()
.Where(x => (x.TenantId == request.TenantId || x.TenantId == "global") && x.Vector != null)
.OrderBy(x => x.Vector!.CosineDistance(queryVector768))
.Take(request.Limit)
.ToListAsync(cancellationToken);
}
if (!candidates.Any())
{
// 3. Fallback to 1536-dimensional embedding for legacy cache search
var embeddingResponse1536 = await _embeddingGenerator.GenerateAsync(
new[] { request.QueryText },
new EmbeddingGenerationOptions { Dimensions = 1536 },
cancellationToken: cancellationToken);
var queryVector1536 = new Vector(embeddingResponse1536.First().Vector.ToArray());
List<SemanticKnowledgeCache> legacyResults;
if (isSqlite)
{
var allCache = await dbContext.SemanticKnowledgeCache
.AsNoTracking()
.Where(x => x.TenantId == request.TenantId && x.Vector != null)
.ToListAsync(cancellationToken);
legacyResults = allCache
.OrderBy(x => CalculateCosineDistance(x.Vector!, queryVector1536))
.Take(request.Limit)
.ToList();
}
else
{
legacyResults = await dbContext.SemanticKnowledgeCache
.AsNoTracking()
.Where(x => x.TenantId == request.TenantId && x.Vector != null)
.OrderBy(x => x.Vector!.CosineDistance(queryVector1536))
.Take(request.Limit)
.ToListAsync(cancellationToken);
}
return Result.Ok(legacyResults.Select(r => new SemanticSearchResultDto
{
ContentHash = r.ContentHash,
Snippet = r.OriginalText,
RelevanceScore = (float)(1 - (isSqlite ? CalculateCosineDistance(r.Vector!, queryVector1536) : r.Vector!.CosineDistance(queryVector1536)))
}).ToList());
}
// 3. Graph Expansion: Pull related units (e.g. Definitions, Next steps)
var candidateIds = candidates.Select(c => c.Id).ToList();
var links = await dbContext.KnowledgeUnitLinks
.AsNoTracking()
.Where(l => candidateIds.Contains(l.SourceUnitId) && (l.RelationType == "Defines" || l.RelationType == "Next"))
.ToListAsync(cancellationToken);
var relatedIds = links.Select(l => l.TargetUnitId).Distinct().ToList();
var relatedUnits = await dbContext.KnowledgeUnits
.AsNoTracking()
.Where(u => relatedIds.Contains(u.Id))
.ToDictionaryAsync(u => u.Id, cancellationToken);
// 4. Mapping with Context Enrichment
var dtos = candidates.Select(c =>
{
var dto = new SemanticSearchResultDto
{
ContentHash = c.Id.ToString(),
Snippet = c.Content,
UnitType = c.Type.ToString(),
RelevanceScore = (float)(1 - (isSqlite ? CalculateCosineDistance(c.Vector!, queryVector768) : c.Vector!.CosineDistance(queryVector768))),
Metadata = string.IsNullOrEmpty(c.MetadataJson)
? null
: JsonSerializer.Deserialize<Dictionary<string, object>>(c.MetadataJson)
};
// Enrich snippet with definitions if present
var unitLinks = links.Where(l => l.SourceUnitId == c.Id && l.RelationType == "Defines").ToList();
if (unitLinks.Any())
{
var definitions = unitLinks
.Where(l => relatedUnits.ContainsKey(l.TargetUnitId))
.Select(l => relatedUnits[l.TargetUnitId].Content);
dto.Snippet = $"[Context: {string.Join("; ", definitions)}]\n{dto.Snippet}";
}
return dto;
}).ToList();
return Result.Ok(dtos);
}
catch (Exception ex)
{
return Result.Fail(new Error("Failed to perform semantic search").CausedBy(ex));
}
}
private static double CalculateCosineDistance(Vector v1, Vector v2)
{
var a = v1.ToArray();
var b = v2.ToArray();
if (a.Length != b.Length) return 1.0;
double dotProduct = 0;
double l1 = 0;
double l2 = 0;
for (int i = 0; i < a.Length; i++)
{
dotProduct += a[i] * b[i];
l1 += a[i] * a[i];
l2 += b[i] * b[i];
}
if (l1 == 0 || l2 == 0) return 1.0;
return 1.0 - (dotProduct / (Math.Sqrt(l1) * Math.Sqrt(l2)));
return await _knowledgeService.SearchLibrarySemanticallyAsync(
request.QueryText,
request.TenantId,
request.Limit,
cancellationToken);
}
}
+1 -1
View File
@@ -11,7 +11,6 @@
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="10.0.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="10.0.7" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.0" />
<PackageReference Include="Pgvector.EntityFrameworkCore" Version="0.3.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="10.0.0" />
@@ -19,6 +18,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Pgvector.EntityFrameworkCore" Version="0.3.0" />
</ItemGroup>
<ItemGroup>
@@ -1,8 +1,6 @@
using Microsoft.AspNetCore.Identity.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using NexusReader.Domain.Entities;
using Pgvector;
namespace NexusReader.Data.Persistence;
@@ -52,59 +50,24 @@ public class AppDbContext : IdentityDbContext<NexusUser>
entity.HasIndex(p => p.PlanName).IsUnique();
});
if (Database.IsSqlite())
modelBuilder.Entity<SemanticKnowledgeCache>(entity =>
{
var vectorConverter = new Microsoft.EntityFrameworkCore.Storage.ValueConversion.ValueConverter<Vector, string>(
v => v != null ? string.Join(",", v.ToArray()) : string.Empty,
s => !string.IsNullOrEmpty(s) ? new Vector(s.Split(',').Select(float.Parse).ToArray()) : null!
);
entity.HasKey(e => e.ContentHash);
entity.HasIndex(e => e.ContentHash).IsUnique();
entity.HasIndex(e => e.TenantId);
});
modelBuilder.Entity<SemanticKnowledgeCache>(entity =>
{
entity.HasKey(e => e.ContentHash);
entity.HasIndex(e => e.ContentHash).IsUnique();
entity.HasIndex(e => e.TenantId);
entity.Property(e => e.Vector).HasConversion(vectorConverter);
});
modelBuilder.Entity<KnowledgeUnit>(entity =>
{
entity.HasKey(e => e.Id);
entity.HasIndex(e => e.TenantId);
entity.HasIndex(e => e.EbookId);
entity.Property(e => e.Vector).HasConversion(vectorConverter);
entity.HasOne(e => e.Ebook)
.WithMany()
.HasForeignKey(e => e.EbookId)
.OnDelete(DeleteBehavior.Cascade);
});
}
else
modelBuilder.Entity<KnowledgeUnit>(entity =>
{
modelBuilder.HasPostgresExtension("vector");
entity.HasKey(e => e.Id);
entity.HasIndex(e => e.TenantId);
entity.HasIndex(e => e.EbookId);
modelBuilder.Entity<SemanticKnowledgeCache>(entity =>
{
entity.HasKey(e => e.ContentHash);
entity.HasIndex(e => e.ContentHash).IsUnique();
entity.HasIndex(e => e.TenantId);
entity.Property(e => e.Vector).HasColumnType("vector(1536)");
});
modelBuilder.Entity<KnowledgeUnit>(entity =>
{
entity.HasKey(e => e.Id);
entity.HasIndex(e => e.TenantId);
entity.HasIndex(e => e.EbookId);
entity.Property(e => e.Vector).HasColumnType("vector(768)");
entity.HasOne(e => e.Ebook)
.WithMany()
.HasForeignKey(e => e.EbookId)
.OnDelete(DeleteBehavior.Cascade);
});
}
entity.HasOne(e => e.Ebook)
.WithMany()
.HasForeignKey(e => e.EbookId)
.OnDelete(DeleteBehavior.Cascade);
});
modelBuilder.Entity<KnowledgeUnitLink>(entity =>
{
@@ -1,7 +1,6 @@
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
using NexusReader.Domain.Enums;
using Pgvector;
namespace NexusReader.Domain.Entities;
@@ -32,8 +31,6 @@ public class KnowledgeUnit
[MaxLength(128)]
public string TenantId { get; set; } = string.Empty;
public Vector? Vector { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
// Relationships
@@ -1,6 +1,5 @@
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
using Pgvector;
namespace NexusReader.Domain.Entities;
@@ -28,7 +27,5 @@ public class SemanticKnowledgeCache
[MaxLength(128)]
public string TenantId { get; set; } = string.Empty;
public Vector? Vector { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
@@ -9,7 +9,6 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Identity.Stores" Version="10.0.7" />
<PackageReference Include="Pgvector" Version="0.3.2" />
</ItemGroup>
</Project>
@@ -1,6 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Pgvector.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.AI;
using GeminiDotnet;
@@ -20,6 +19,10 @@ using NexusReader.Domain.Entities;
using Microsoft.AspNetCore.Identity;
using Microsoft.AspNetCore.Authorization;
using NexusReader.Application.Security.Authorization;
using Qdrant.Client;
using Neo4j.Driver;
using Hangfire;
using Hangfire.PostgreSql;
namespace NexusReader.Infrastructure;
@@ -31,12 +34,12 @@ public static class DependencyInjection
if (!string.IsNullOrEmpty(pgConnectionString))
{
services.AddDbContextFactory<AppDbContext>(options =>
options.UseNpgsql(pgConnectionString, x => x.UseVector()),
options.UseNpgsql(pgConnectionString),
ServiceLifetime.Scoped);
// Also register a scoped DbContext for repositories that need it
services.AddDbContext<AppDbContext>(options =>
options.UseNpgsql(pgConnectionString, x => x.UseVector()));
options.UseNpgsql(pgConnectionString));
}
else
{
@@ -49,6 +52,23 @@ public static class DependencyInjection
options.UseSqlite(sqliteConnectionString));
}
// Qdrant Client registration
var qdrantUrl = configuration.GetConnectionString("QdrantConnection") ?? "http://localhost:6334";
services.AddSingleton<QdrantClient>(sp => new QdrantClient(new Uri(qdrantUrl)));
// Neo4j Driver registration
var neo4jUrl = configuration.GetConnectionString("Neo4jConnection") ?? "bolt://localhost:7687";
services.AddSingleton<IDriver>(sp => GraphDatabase.Driver(neo4jUrl, AuthTokens.None));
// Hangfire registration
if (!string.IsNullOrEmpty(pgConnectionString))
{
services.AddHangfire(config => config
.UseRecommendedSerializerSettings()
.UsePostgreSqlStorage(options => options.UseNpgsqlConnection(pgConnectionString)));
services.AddHangfireServer();
}
services.Configure<AiSettings>(configuration.GetSection(AiSettings.SectionName));
services.Configure<StripeSettings>(configuration.GetSection(StripeSettings.SectionName));
var aiSettings = configuration.GetSection(AiSettings.SectionName).Get<AiSettings>() ?? new AiSettings();
@@ -11,6 +11,8 @@
<ItemGroup>
<PackageReference Include="GeminiDotnet.Extensions.AI" Version="0.23.0" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.23" />
<PackageReference Include="Hangfire.PostgreSql" Version="1.21.1" />
<PackageReference Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="10.0.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="10.0.7">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
@@ -21,10 +23,11 @@
<PackageReference Include="Microsoft.Extensions.Resilience" Version="10.5.0" />
<PackageReference Include="Microsoft.ML.Tokenizers" Version="2.0.0" />
<PackageReference Include="Microsoft.ML.Tokenizers.Data.Cl100kBase" Version="2.0.0" />
<PackageReference Include="Neo4j.Driver" Version="6.1.1" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.0" />
<PackageReference Include="Pgvector.EntityFrameworkCore" Version="0.3.0" />
<PackageReference Include="Polly" Version="8.6.6" />
<PackageReference Include="Polly.Extensions.Http" Version="3.0.0" />
<PackageReference Include="Qdrant.Client" Version="1.18.1" />
<PackageReference Include="Stripe.net" Version="51.1.0" />
<PackageReference Include="VersOne.Epub" Version="3.3.6" />
</ItemGroup>
@@ -14,8 +14,8 @@ using Polly;
using Polly.Registry;
using Microsoft.Extensions.Options;
using NexusReader.Infrastructure.Configuration;
using Pgvector;
using Pgvector.EntityFrameworkCore;
using Qdrant.Client;
using Neo4j.Driver;
namespace NexusReader.Infrastructure.Services;
@@ -30,6 +30,8 @@ public class KnowledgeService : IKnowledgeService
private readonly AiSettings _settings;
private readonly Tokenizer _tokenizer;
private readonly ILogger<KnowledgeService> _logger;
private readonly QdrantClient _qdrantClient;
private readonly IDriver _neo4jDriver;
private const string PromptVersion = "1.3";
private static readonly ConcurrentDictionary<string, Lazy<Task<Result<KnowledgePacket>>>> _activeRequests = new();
@@ -39,7 +41,9 @@ public class KnowledgeService : IKnowledgeService
IDbContextFactory<AppDbContext> dbContextFactory,
ResiliencePipelineProvider<string> pipelineProvider,
IOptions<AiSettings> settings,
ILogger<KnowledgeService> logger)
ILogger<KnowledgeService> logger,
QdrantClient qdrantClient,
IDriver neo4jDriver)
{
_chatClient = chatClient;
_embeddingGenerator = embeddingGenerator;
@@ -47,6 +51,8 @@ public class KnowledgeService : IKnowledgeService
_retryPipeline = pipelineProvider.GetPipeline("ai-retry");
_settings = settings.Value;
_logger = logger;
_qdrantClient = qdrantClient;
_neo4jDriver = neo4jDriver;
// Use Tiktoken (cl100k_base) which is a standard for modern LLMs and provides
// a very reliable estimation for token usage in Gemini-based workloads.
_tokenizer = TiktokenTokenizer.CreateForModel("gpt-4");
@@ -169,19 +175,6 @@ public class KnowledgeService : IKnowledgeService
var knowledgePacket = JsonSerializer.Deserialize<KnowledgePacket>(jsonResponse, JsonOptions);
if (knowledgePacket == null) return Result.Fail("Failed to deserialize AI response.");
// 3. Generate Embedding if not present
float[]? vector = null;
try
{
var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(new[] { normalizedText }, new EmbeddingGenerationOptions { Dimensions = 1536 }, cancellationToken: ct));
vector = embeddingResponse.First().Vector.ToArray();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[KnowledgeService] Embedding generation failed; proceeding without vector.");
}
// 4. Save to Cache
var cached = await dbContext.SemanticKnowledgeCache
.FirstOrDefaultAsync(c => c.ContentHash == hash && c.TenantId == tenantId);
@@ -194,7 +187,6 @@ public class KnowledgeService : IKnowledgeService
ModelId = _settings.Model,
PromptVersion = PromptVersion,
TenantId = tenantId,
Vector = vector != null ? new Vector(vector) : null,
CreatedAt = DateTime.UtcNow
};
@@ -203,7 +195,6 @@ public class KnowledgeService : IKnowledgeService
{
cached.JsonData = jsonResponse;
cached.OriginalText = normalizedText;
cached.Vector = vector != null ? new Vector(vector) : null;
cached.CreatedAt = DateTime.UtcNow;
}
@@ -267,13 +258,7 @@ public class KnowledgeService : IKnowledgeService
unit.MetadataJson = JsonSerializer.Serialize(unitDto.Metadata);
try
{
var emb = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(new[] { unit.Content }, new EmbeddingGenerationOptions { Dimensions = 768 }, cancellationToken: ct), cancellationToken);
unit.Vector = new Vector(emb.First().Vector.ToArray());
}
catch { /* Ignore embedding errors for now */ }
// Embeddings and vector storage are handled via Qdrant in the new pipeline.
processedUnitIds.Add(unit.Id);
}
@@ -342,21 +327,54 @@ public class KnowledgeService : IKnowledgeService
public async Task<Result<List<RelevantContext>>> GetRelevantContextAsync(string query, string tenantId, CancellationToken cancellationToken = default)
{
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
try
{
var queryEmbedding = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(new[] { query }, cancellationToken: ct), cancellationToken);
var queryVector = new Vector(queryEmbedding.First().Vector.ToArray());
var queryVector = queryEmbedding.First().Vector.ToArray();
var relevantUnits = await dbContext.KnowledgeUnits
.Where(u => u.TenantId == tenantId)
.OrderBy(u => u.Vector!.L2Distance(queryVector))
.Take(5)
.Select(u => new RelevantContext { Text = u.Content, Confidence = 1.0 })
.ToListAsync(cancellationToken);
var filter = new Qdrant.Client.Grpc.Filter();
filter.Should.Add(new Qdrant.Client.Grpc.Condition
{
Field = new Qdrant.Client.Grpc.FieldCondition
{
Key = "tenantId",
Match = new Qdrant.Client.Grpc.Match { Text = tenantId }
}
});
filter.Should.Add(new Qdrant.Client.Grpc.Condition
{
Field = new Qdrant.Client.Grpc.FieldCondition
{
Key = "tenantId",
Match = new Qdrant.Client.Grpc.Match { Text = "global" }
}
});
return Result.Ok(relevantUnits);
List<Qdrant.Client.Grpc.ScoredPoint> searchResult;
try
{
var response = await _qdrantClient.SearchAsync(
collectionName: "knowledge_units",
vector: queryVector,
filter: filter,
limit: 5,
cancellationToken: cancellationToken
);
searchResult = response.ToList();
}
catch (Exception)
{
searchResult = new List<Qdrant.Client.Grpc.ScoredPoint>();
}
var contexts = searchResult.Select(point => new RelevantContext
{
Text = point.Payload.TryGetValue("content", out var cv) ? cv.StringValue : string.Empty,
Confidence = point.Score
}).ToList();
return Result.Ok(contexts);
}
catch (Exception ex)
{
@@ -364,6 +382,170 @@ public class KnowledgeService : IKnowledgeService
}
}
public async Task<Result<List<SemanticSearchResultDto>>> SearchLibrarySemanticallyAsync(string queryText, string tenantId, int limit, CancellationToken cancellationToken = default)
{
try
{
// 1. Generate 768-dimensional embedding
var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct =>
await _embeddingGenerator.GenerateAsync(
new[] { queryText },
new EmbeddingGenerationOptions { Dimensions = 768 },
cancellationToken: ct), cancellationToken);
var queryVector = embeddingResponse.First().Vector.ToArray();
// 2. Query Qdrant
var filter = new Qdrant.Client.Grpc.Filter();
filter.Should.Add(new Qdrant.Client.Grpc.Condition
{
Field = new Qdrant.Client.Grpc.FieldCondition
{
Key = "tenantId",
Match = new Qdrant.Client.Grpc.Match { Text = tenantId }
}
});
filter.Should.Add(new Qdrant.Client.Grpc.Condition
{
Field = new Qdrant.Client.Grpc.FieldCondition
{
Key = "tenantId",
Match = new Qdrant.Client.Grpc.Match { Text = "global" }
}
});
List<Qdrant.Client.Grpc.ScoredPoint> searchResult;
try
{
var response = await _qdrantClient.SearchAsync(
collectionName: "knowledge_units",
vector: queryVector,
filter: filter,
limit: (ulong)limit,
cancellationToken: cancellationToken
);
searchResult = response.ToList();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[KnowledgeService] Failed to search in Qdrant; collection might not exist yet.");
searchResult = new List<Qdrant.Client.Grpc.ScoredPoint>();
}
if (!searchResult.Any())
{
return Result.Ok(new List<SemanticSearchResultDto>());
}
// 3. Graph Expansion via Neo4j
var candidateIds = searchResult.Select(r => r.Id.ToString()).ToList();
var definitions = new Dictionary<string, List<string>>();
if (candidateIds.Any())
{
try
{
await using var session = _neo4jDriver.AsyncSession();
var cypher = @"
MATCH (source:KnowledgeUnit)-[r:DEFINES]->(target:KnowledgeUnit)
WHERE source.id IN $candidateIds
RETURN source.id AS sourceId, target.content AS targetContent";
var neoResult = await session.ExecuteReadAsync(async tx =>
{
var cursor = await tx.RunAsync(cypher, new { candidateIds });
return await cursor.ToListAsync();
});
foreach (var record in neoResult)
{
var sourceId = record["sourceId"].As<string>();
var targetContent = record["targetContent"].As<string>();
if (!definitions.ContainsKey(sourceId))
{
definitions[sourceId] = new List<string>();
}
definitions[sourceId].Add(targetContent);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[KnowledgeService] Neo4j graph expansion query failed.");
}
}
// 4. Retrieve Ebook Titles from PostgreSQL
var ebookIds = searchResult
.Where(r => r.Payload.TryGetValue("ebookId", out var ev) && Guid.TryParse(ev.StringValue, out _))
.Select(r => Guid.Parse(r.Payload["ebookId"].StringValue))
.Distinct()
.ToList();
var ebookTitles = new Dictionary<Guid, string>();
if (ebookIds.Any())
{
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
ebookTitles = await dbContext.Ebooks
.Where(e => ebookIds.Contains(e.Id))
.ToDictionaryAsync(e => e.Id, e => e.Title, cancellationToken);
}
// 5. Map results to DTOs
var dtos = searchResult.Select(point =>
{
var content = point.Payload.TryGetValue("content", out var cv) ? cv.StringValue : string.Empty;
var type = point.Payload.TryGetValue("type", out var tv) ? tv.StringValue : string.Empty;
var ebookIdStr = point.Payload.TryGetValue("ebookId", out var ev) ? ev.StringValue : null;
Guid? ebookId = null;
if (Guid.TryParse(ebookIdStr, out var parsedId))
{
ebookId = parsedId;
}
string? bookTitle = null;
if (ebookId.HasValue)
{
ebookTitles.TryGetValue(ebookId.Value, out bookTitle);
}
Dictionary<string, object>? metadata = null;
if (point.Payload.TryGetValue("metadataJson", out var metaVal) && !string.IsNullOrEmpty(metaVal.StringValue))
{
try
{
metadata = JsonSerializer.Deserialize<Dictionary<string, object>>(metaVal.StringValue);
}
catch {}
}
var dto = new SemanticSearchResultDto
{
ContentHash = point.Id.ToString(),
Snippet = content,
UnitType = type,
RelevanceScore = point.Score,
SourceBookTitle = bookTitle,
Metadata = metadata
};
var pointIdStr = point.Id.ToString();
if (definitions.TryGetValue(pointIdStr, out var pointDefs) && pointDefs.Any())
{
dto.Snippet = $"[Context: {string.Join("; ", pointDefs)}]\n{dto.Snippet}";
}
return dto;
}).ToList();
return Result.Ok(dtos);
}
catch (Exception ex)
{
return Result.Fail(new Error("Failed to search library semantically").CausedBy(ex));
}
}
public async Task<Result> ClearCacheAsync(CancellationToken cancellationToken = default)
{
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
@@ -73,6 +73,26 @@ public class WasmKnowledgeService : IKnowledgeService
}
}
public async Task<Result<List<SemanticSearchResultDto>>> SearchLibrarySemanticallyAsync(string queryText, string tenantId, int limit, CancellationToken cancellationToken = default)
{
try
{
var response = await _httpClient.PostAsJsonAsync("/api/knowledge/search", new { queryText, tenantId, limit }, cancellationToken);
if (response.IsSuccessStatusCode)
{
var searchResults = await response.Content.ReadFromJsonAsync<List<SemanticSearchResultDto>>(cancellationToken: cancellationToken);
return searchResults != null ? Result.Ok(searchResults) : Result.Ok(new List<SemanticSearchResultDto>());
}
var errorBody = await response.Content.ReadAsStringAsync(cancellationToken);
return Result.Fail($"Server error ({response.StatusCode}): {errorBody}");
}
catch (Exception ex)
{
return Result.Fail(new Error($"Network error: {ex.Message}").CausedBy(ex));
}
}
private async Task<Result<KnowledgePacket>> CallKnowledgeApiAsync(string endpoint, string text, Guid? ebookId, CancellationToken cancellationToken)
{
try
@@ -19,6 +19,7 @@
<PackageReference Include="VersOne.Epub" Version="3.3.6" />
<ProjectReference Include="..\NexusReader.Web.Client\NexusReader.Web.Client.csproj" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="10.0.7" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.23" />
</ItemGroup>
<ItemGroup>
+3
View File
@@ -24,6 +24,7 @@ using Stripe;
using Microsoft.Extensions.AI;
using NexusReader.Application.Abstractions.Persistence;
using NexusReader.Application.Abstractions.Messaging;
using Hangfire;
AppContext.SetSwitch("Npgsql.EnableLegacyTimestampBehavior", true);
@@ -159,6 +160,8 @@ builder.Services.Configure<IdentityOptions>(options =>
var app = builder.Build();
app.UseHangfireDashboard();
// Startup Validation
using (var scope = app.Services.CreateScope())
{
@@ -8,12 +8,13 @@ using Microsoft.Data.Sqlite;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.AI;
using Moq;
using FluentResults;
using NexusReader.Application.Abstractions.Services;
using NexusReader.Application.DTOs.AI;
using NexusReader.Application.DTOs.User;
using NexusReader.Application.Queries.Library;
using NexusReader.Data.Persistence;
using NexusReader.Domain.Entities;
using Pgvector;
using Xunit;
namespace NexusReader.Application.Tests.Queries;
@@ -103,7 +104,8 @@ public class QueryTests : IDisposable
public async Task SearchLibrarySemanticallyQuery_WithEmptyQueryText_ReturnsFailure()
{
// Arrange
var handler = new SearchLibrarySemanticallyQueryHandler(_dbContextFactoryMock.Object, _embeddingGeneratorMock.Object);
var knowledgeServiceMock = new Mock<IKnowledgeService>();
var handler = new SearchLibrarySemanticallyQueryHandler(knowledgeServiceMock.Object);
var query = new SearchLibrarySemanticallyQuery("", "tenant-123");
// Act
@@ -115,44 +117,25 @@ public class QueryTests : IDisposable
}
[Fact]
public async Task SearchLibrarySemanticallyQuery_WithNoResults_TriggersFallback1536Embedding()
public async Task SearchLibrarySemanticallyQuery_WithValidQuery_CallsKnowledgeService()
{
// Arrange
// Mock 768-dim primary embedding generator response
var embedding768 = new Embedding<float>(new float[768]);
var mockResponse768 = new GeneratedEmbeddings<Embedding<float>>(new List<Embedding<float>> { embedding768 });
_embeddingGeneratorMock.Setup(g => g.GenerateAsync(
It.Is<IEnumerable<string>>(s => s.Contains("test")),
It.Is<EmbeddingGenerationOptions>(o => o.Dimensions == 768),
It.IsAny<CancellationToken>()))
.ReturnsAsync(mockResponse768);
// Mock 1536-dim fallback embedding generator response
var embedding1536 = new Embedding<float>(new float[1536]);
var mockResponse1536 = new GeneratedEmbeddings<Embedding<float>>(new List<Embedding<float>> { embedding1536 });
_embeddingGeneratorMock.Setup(g => g.GenerateAsync(
It.Is<IEnumerable<string>>(s => s.Contains("test")),
It.Is<EmbeddingGenerationOptions>(o => o.Dimensions == 1536),
It.IsAny<CancellationToken>()))
.ReturnsAsync(mockResponse1536);
// Seed one legacy cache entry
using (var context = new AppDbContext(_contextOptions))
var knowledgeServiceMock = new Mock<IKnowledgeService>();
var expectedResults = new List<SemanticSearchResultDto>
{
var cacheEntry = new SemanticKnowledgeCache
new SemanticSearchResultDto
{
TenantId = "tenant-123",
ContentHash = "hash-123",
OriginalText = "Fallback Cache Content Snippet",
Vector = new Vector(new float[1536]),
PromptVersion = "1",
CreatedAt = DateTime.UtcNow
};
context.SemanticKnowledgeCache.Add(cacheEntry);
await context.SaveChangesAsync();
}
Snippet = "Semantic search result content snippet",
UnitType = "Concept",
RelevanceScore = 0.95f
}
};
var handler = new SearchLibrarySemanticallyQueryHandler(_dbContextFactoryMock.Object, _embeddingGeneratorMock.Object);
knowledgeServiceMock.Setup(s => s.SearchLibrarySemanticallyAsync("test", "tenant-123", 5, It.IsAny<CancellationToken>()))
.ReturnsAsync(Result.Ok(expectedResults));
var handler = new SearchLibrarySemanticallyQueryHandler(knowledgeServiceMock.Object);
var query = new SearchLibrarySemanticallyQuery("test", "tenant-123");
// Act
@@ -161,7 +144,7 @@ public class QueryTests : IDisposable
// Assert
result.IsSuccess.Should().BeTrue();
result.Value.Should().HaveCount(1);
result.Value.First().Snippet.Should().Be("Fallback Cache Content Snippet");
result.Value.First().Snippet.Should().Be("Semantic search result content snippet");
result.Value.First().ContentHash.Should().Be("hash-123");
}