using System.Text.Json; using System.Collections.Concurrent; using FluentResults; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.AI; using Microsoft.Extensions.Logging; using Microsoft.ML.Tokenizers; using NexusReader.Application.Abstractions.Services; using NexusReader.Application.DTOs.AI; using NexusReader.Domain.Entities; using NexusReader.Infrastructure.Helpers; using NexusReader.Data.Persistence; using Polly; using Polly.Registry; using Microsoft.Extensions.Options; using NexusReader.Infrastructure.Configuration; using Qdrant.Client; using Qdrant.Client.Grpc; using Neo4j.Driver; namespace NexusReader.Infrastructure.Services; public class KnowledgeService : IKnowledgeService { private static readonly JsonSerializerOptions JsonOptions = new() { PropertyNameCaseInsensitive = true }; private readonly IChatClient _chatClient; private readonly IEmbeddingGenerator> _embeddingGenerator; private readonly IDbContextFactory _dbContextFactory; private readonly ResiliencePipeline _retryPipeline; private readonly AiSettings _settings; private readonly Tokenizer _tokenizer; private readonly ILogger _logger; private readonly QdrantClient _qdrantClient; private readonly IDriver _neo4jDriver; private const string PromptVersion = "1.7"; private static readonly ConcurrentDictionary>>> _activeRequests = new(); private static readonly SemaphoreSlim _collectionSemaphore = new(1, 1); public KnowledgeService( IChatClient chatClient, IEmbeddingGenerator> embeddingGenerator, IDbContextFactory dbContextFactory, ResiliencePipelineProvider pipelineProvider, IOptions settings, ILogger logger, QdrantClient qdrantClient, IDriver neo4jDriver) { _chatClient = chatClient; _embeddingGenerator = embeddingGenerator; _dbContextFactory = dbContextFactory; _retryPipeline = pipelineProvider.GetPipeline("ai-retry"); _settings = settings.Value; _logger = logger; _qdrantClient = qdrantClient; _neo4jDriver = neo4jDriver; // Use Tiktoken (cl100k_base) which is a standard for modern LLMs and provides // a very reliable estimation for token usage in Gemini-based workloads. _tokenizer = TiktokenTokenizer.CreateForModel("gpt-4"); } public async Task> GetKnowledgeAsync(string text, string tenantId, Guid? ebookId = null, CancellationToken cancellationToken = default) { return await GetKnowledgeInternalAsync(text, tenantId, PromptRegistry.KnowledgeExtractionSystemPrompt, "full", ebookId, cancellationToken); } public async Task> GetGraphDataAsync(string text, string tenantId, Guid? ebookId = null, CancellationToken cancellationToken = default) { return await GetKnowledgeInternalAsync(text, tenantId, PromptRegistry.GraphExtractionPrompt, "graph", ebookId, cancellationToken); } public async Task> GetSummaryAndQuizAsync(string text, string tenantId, Guid? ebookId = null, CancellationToken cancellationToken = default) { return await GetKnowledgeInternalAsync(text, tenantId, PromptRegistry.SummaryAndQuizPrompt, "summary_quiz", ebookId, cancellationToken); } public async Task> GetKnowledgeMapAsync(string text, string tenantId, Guid? ebookId = null, CancellationToken cancellationToken = default) { return await GetKnowledgeInternalAsync(text, tenantId, PromptRegistry.KM_ExtractionPrompt, "km_map", ebookId, cancellationToken); } private async Task> GetKnowledgeInternalAsync(string text, string tenantId, string systemPrompt, string traceType, Guid? ebookId, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(text)) return Result.Fail("Input text is empty."); using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken); var normalizedText = text.Trim(); var hashInput = $"{normalizedText}:{traceType}:{PromptVersion}"; var hash = ContentHasher.ComputeHash(hashInput); // 1. Check Cache var cached = await dbContext.SemanticKnowledgeCache .FirstOrDefaultAsync(c => c.ContentHash == hash, cancellationToken); if (cached != null && cached.PromptVersion == PromptVersion) { _logger.LogDebug("[KnowledgeService] Cache Hit for {TraceType} ({Hash})", traceType, hash); try { var packet = JsonSerializer.Deserialize(cached.JsonData, JsonOptions); if (packet != null) { await ProcessKnowledgeUnitsAsync(packet, tenantId, ebookId, dbContext, cancellationToken); await dbContext.SaveChangesAsync(cancellationToken); return Result.Ok(packet); } } catch (JsonException ex) { _logger.LogWarning(ex, "[KnowledgeService] Cached JSON for {Hash} was invalid; regenerating.", hash); } } // Deduplicate concurrent active requests for the exact same hash var requestKey = $"{hash}:{traceType}"; var lazyTask = _activeRequests.GetOrAdd(requestKey, k => new Lazy>>( () => ExecuteAiRequestAndCacheAsync(normalizedText, tenantId, systemPrompt, traceType, ebookId, hash), System.Threading.LazyThreadSafetyMode.ExecutionAndPublication )); try { var result = await lazyTask.Value; // If the AI call returned a failure, remove it from the active dictionary // so subsequent retries have a chance to request the AI again. if (result.IsFailed) { _activeRequests.TryRemove(requestKey, out _); } return result; } catch (Exception) { _activeRequests.TryRemove(requestKey, out _); throw; } finally { _activeRequests.TryRemove(requestKey, out _); } } private async Task> ExecuteAiRequestAndCacheAsync( string normalizedText, string tenantId, string systemPrompt, string traceType, Guid? ebookId, string hash) { _logger.LogInformation("[KnowledgeService] Cache Miss for {TraceType} ({Hash}). Requesting AI...", traceType, hash); try { using var dbContext = await _dbContextFactory.CreateDbContextAsync(); var options = new ChatOptions { Temperature = (float)_settings.Temperature, MaxOutputTokens = _settings.MaxOutputTokens }; var response = await _retryPipeline.ExecuteAsync(async ct => await _chatClient.GetResponseAsync(new List { new ChatMessage(ChatRole.System, systemPrompt), new ChatMessage(ChatRole.User, normalizedText) }, options, cancellationToken: ct)); var rawResponse = response.Text?.Trim() ?? string.Empty; if (string.IsNullOrWhiteSpace(rawResponse)) return Result.Fail("AI returned an empty response."); // Cleanup markdown code blocks and repair truncation var jsonResponse = rawResponse.Replace("```json", "").Replace("```", "").Trim(); jsonResponse = JsonRepairHelper.Repair(jsonResponse); try { var knowledgePacket = JsonSerializer.Deserialize(jsonResponse, JsonOptions); if (knowledgePacket == null) return Result.Fail("Failed to deserialize AI response."); // 4. Save to Cache var cached = await dbContext.SemanticKnowledgeCache .FirstOrDefaultAsync(c => c.ContentHash == hash); var cacheEntry = new SemanticKnowledgeCache { ContentHash = hash, JsonData = jsonResponse, OriginalText = normalizedText, ModelId = _settings.Model, PromptVersion = PromptVersion, TenantId = tenantId, CreatedAt = DateTime.UtcNow }; if (cached == null) dbContext.SemanticKnowledgeCache.Add(cacheEntry); else { cached.JsonData = jsonResponse; cached.OriginalText = normalizedText; cached.CreatedAt = DateTime.UtcNow; } // 5. Process structured KnowledgeUnits (Graph Expansion) await ProcessKnowledgeUnitsAsync(knowledgePacket, tenantId, ebookId, dbContext, default); try { await dbContext.SaveChangesAsync(); } catch (DbUpdateException ex) when (ex.InnerException is Npgsql.PostgresException pgEx && pgEx.SqlState == "23505") { _logger.LogWarning("[KnowledgeService] Concurrency collision on SemanticKnowledgeCache for {Hash}; another process saved it first. Swallowing.", hash); } return Result.Ok(knowledgePacket); } catch (JsonException ex) { _logger.LogError(ex, "[KnowledgeService] JSON deserialization error. Raw response length: {Length}", rawResponse.Length); return Result.Fail($"Failed to deserialize AI response: {ex.Message}"); } } catch (Exception ex) { return Result.Fail(new Error("Failed to extract knowledge from AI").CausedBy(ex)); } finally { var requestKey = $"{tenantId}:{hash}:{traceType}"; _activeRequests.TryRemove(requestKey, out _); } } private async Task ProcessKnowledgeUnitsAsync(KnowledgePacket packet, string tenantId, Guid? ebookId, AppDbContext dbContext, CancellationToken cancellationToken) { if (packet.Graph != null && (packet.Units == null || !packet.Units.Any())) { var graphUnits = packet.Graph.Nodes.Select(node => new KnowledgeUnitDto( node.Id, node.Type ?? "concept", node.Description ?? node.Label, new Dictionary { ["label"] = node.Label, ["group"] = node.Group, ["summary"] = node.Summary ?? "", ["key_terms"] = node.KeyTerms ?? new List() } )).ToList(); var graphLinks = packet.Graph.Links.Select(link => new KnowledgeLinkDto( link.Source, link.Target, link.RelationType )).ToList(); packet = packet with { Units = graphUnits, Links = graphLinks }; } var unitIds = packet.Units.Select(u => u.Id).ToList(); var linkSourceIds = packet.Links.Select(l => l.Source).ToList(); var linkTargetIds = packet.Links.Select(l => l.Target).ToList(); var allCandidateIds = unitIds.Concat(linkSourceIds).Concat(linkTargetIds).Distinct().ToList(); // Single batch query to find existing units var existingUnits = await dbContext.KnowledgeUnits .Where(u => allCandidateIds.Contains(u.Id)) .ToDictionaryAsync(u => u.Id, cancellationToken); var processedUnitIds = new HashSet(); foreach (var unitDto in packet.Units) { var unitId = unitDto.Id; existingUnits.TryGetValue(unitId, out var unit); if (unit == null) { unit = new KnowledgeUnit { Id = unitId, TenantId = tenantId }; dbContext.KnowledgeUnits.Add(unit); existingUnits[unitId] = unit; } unit.Type = Enum.TryParse(unitDto.Type, true, out var type) ? type : NexusReader.Domain.Enums.KnowledgeUnitType.Snippet; unit.Content = unitDto.Content; // Link to the specific ebook if provided. // Link to ebook if provided unit.EbookId = ebookId; unit.MetadataJson = JsonSerializer.Serialize(unitDto.Metadata); // Embeddings and vector storage are handled via Qdrant in the new pipeline. processedUnitIds.Add(unit.Id); } foreach (var linkDto in packet.Links) { var sourceExists = processedUnitIds.Contains(linkDto.Source) || existingUnits.ContainsKey(linkDto.Source); var targetExists = processedUnitIds.Contains(linkDto.Target) || existingUnits.ContainsKey(linkDto.Target); if (sourceExists && targetExists) { // Check if link already exists to avoid duplicates if necessary // For now, assume we can add them or they are new in this session var link = new KnowledgeUnitLink { SourceUnitId = linkDto.Source, TargetUnitId = linkDto.Target, RelationType = linkDto.Relation }; dbContext.KnowledgeUnitLinks.Add(link); } else { _logger.LogWarning("[KnowledgeService] Skipping invalid link {Source} -> {Target}: one or both units are missing.", linkDto.Source, linkDto.Target); } } // Generate and upsert vectors to Qdrant in batch var unitsToEmbed = packet.Units .Where(u => !string.IsNullOrEmpty(u.Content)) .ToList(); if (unitsToEmbed.Any()) { try { var contents = unitsToEmbed.Select(u => u.Content).ToList(); var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct => await _embeddingGenerator.GenerateAsync( contents, new EmbeddingGenerationOptions { Dimensions = 768 }, cancellationToken: ct), cancellationToken); var embeddings = embeddingResponse.ToList(); var points = new List(); for (int i = 0; i < unitsToEmbed.Count; i++) { var unitDto = unitsToEmbed[i]; var vector = embeddings[i].Vector.ToArray(); var point = new PointStruct { Id = GetDeterministicGuid(unitDto.Id), Vectors = vector, Payload = { ["content"] = unitDto.Content, ["type"] = unitDto.Type ?? string.Empty, ["tenantId"] = tenantId, ["ebookId"] = ebookId?.ToString() ?? string.Empty, ["metadataJson"] = JsonSerializer.Serialize(unitDto.Metadata) } }; points.Add(point); } if (points.Any()) { await EnsureCollectionExistsAsync("knowledge_units", cancellationToken); await _qdrantClient.UpsertAsync("knowledge_units", points, cancellationToken: cancellationToken); _logger.LogInformation("[KnowledgeService] Successfully upserted {Count} points to Qdrant collection 'knowledge_units'.", points.Count); } } catch (Exception ex) { _logger.LogError(ex, "[KnowledgeService] Failed to generate and upsert embeddings for knowledge units to Qdrant."); } } // 6. Synchronize to Neo4j graph database await SyncToNeo4jAsync(packet, cancellationToken); } private async Task SyncToNeo4jAsync(KnowledgePacket packet, CancellationToken cancellationToken) { if (packet.Units == null || !packet.Units.Any()) return; try { await using var session = _neo4jDriver.AsyncSession(); // 1. Merge nodes in a transaction await session.ExecuteWriteAsync(async tx => { foreach (var unit in packet.Units) { var cypher = @" MERGE (u:KnowledgeUnit {id: $id}) ON CREATE SET u.content = $content, u.type = $type ON MATCH SET u.content = $content, u.type = $type"; var guidStr = GetDeterministicGuid(unit.Id).ToString(); await tx.RunAsync(cypher, new { id = guidStr, content = unit.Content ?? string.Empty, type = unit.Type ?? "concept" }); } }); // 2. Merge links in a transaction if (packet.Links != null && packet.Links.Any()) { await session.ExecuteWriteAsync(async tx => { foreach (var link in packet.Links) { if (string.IsNullOrWhiteSpace(link.Source) || string.IsNullOrWhiteSpace(link.Target)) continue; var relationType = string.IsNullOrWhiteSpace(link.Relation) ? "RELATED_TO" : link.Relation.Trim().ToUpperInvariant(); relationType = System.Text.RegularExpressions.Regex.Replace(relationType, @"[^A-Z0-9_]", "_"); if (string.IsNullOrEmpty(relationType) || relationType == "_") { relationType = "RELATED_TO"; } var cypher = $@" MATCH (source:KnowledgeUnit {{id: $sourceId}}) MATCH (target:KnowledgeUnit {{id: $targetId}}) MERGE (source)-[r:{relationType}]->(target)"; var sourceGuidStr = GetDeterministicGuid(link.Source).ToString(); var targetGuidStr = GetDeterministicGuid(link.Target).ToString(); await tx.RunAsync(cypher, new { sourceId = sourceGuidStr, targetId = targetGuidStr }); } }); } _logger.LogInformation("[KnowledgeService] Successfully synchronized {NodeCount} nodes and {LinkCount} links to Neo4j.", packet.Units.Count, packet.Links?.Count ?? 0); } catch (Exception ex) { _logger.LogError(ex, "[KnowledgeService] Failed to synchronize knowledge graph to Neo4j."); } } private async Task EnsureCollectionExistsAsync(string collectionName, CancellationToken cancellationToken = default) { await _collectionSemaphore.WaitAsync(cancellationToken); try { var exists = await _qdrantClient.CollectionExistsAsync(collectionName, cancellationToken); if (!exists) { _logger.LogInformation("[KnowledgeService] Creating Qdrant collection '{CollectionName}'...", collectionName); await _qdrantClient.CreateCollectionAsync( collectionName: collectionName, vectorsConfig: new VectorParams { Size = 768, Distance = Distance.Cosine }, cancellationToken: cancellationToken ); _logger.LogInformation("[KnowledgeService] Qdrant collection '{CollectionName}' created successfully.", collectionName); } } catch (Exception ex) { if (ex.Message.Contains("already exists", StringComparison.OrdinalIgnoreCase) || (ex.InnerException != null && ex.InnerException.Message.Contains("already exists", StringComparison.OrdinalIgnoreCase))) { _logger.LogInformation("[KnowledgeService] Qdrant collection '{CollectionName}' was already created by another thread.", collectionName); } else { _logger.LogError(ex, "[KnowledgeService] Error ensuring Qdrant collection '{CollectionName}' exists.", collectionName); } } finally { _collectionSemaphore.Release(); } } private static Guid GetDeterministicGuid(string input) { if (Guid.TryParse(input, out var guid)) { return guid; } using var md5 = System.Security.Cryptography.MD5.Create(); byte[] hash = md5.ComputeHash(System.Text.Encoding.UTF8.GetBytes(input)); return new Guid(hash); } private static string GetPointIdString(PointId pointId) { if (pointId == null) return string.Empty; return pointId.PointIdOptionsCase == PointId.PointIdOptionsOneofCase.Uuid ? pointId.Uuid : pointId.Num.ToString(); } public async Task> VerifyGroundednessAsync(string answer, string context, string tenantId, CancellationToken cancellationToken = default) { var systemPrompt = @" You are a Fact-Checking AI. Evaluate if the 'Answer' is supported by the 'Context'. Rate the groundedness from 0.0 to 1.0. Return ONLY a JSON object: { ""score"": 0.9, ""rationale"": ""string"", ""isGrounded"": true } "; var userPrompt = $"Context: {context}\n\nAnswer: {answer}"; try { var options = new ChatOptions { Temperature = 0.0f, // Low temperature for factual checks MaxOutputTokens = 500 }; var response = await _retryPipeline.ExecuteAsync(async ct => await _chatClient.GetResponseAsync(new List { new ChatMessage(ChatRole.System, systemPrompt), new ChatMessage(ChatRole.User, userPrompt) }, options, cancellationToken: ct), cancellationToken); var rawJson = response.Text?.Trim() ?? "{}"; rawJson = rawJson.Replace("```json", "").Replace("```", "").Trim(); var result = JsonSerializer.Deserialize(rawJson, JsonOptions); return result != null ? Result.Ok(result) : Result.Fail("Failed to parse groundedness result"); } catch (Exception ex) { return Result.Fail(new Error("Failed to verify groundedness").CausedBy(ex)); } } public async Task>> GetRelevantContextAsync(string query, string tenantId, CancellationToken cancellationToken = default) { try { var queryEmbedding = await _retryPipeline.ExecuteAsync(async ct => await _embeddingGenerator.GenerateAsync(new[] { query }, cancellationToken: ct), cancellationToken); var queryVector = queryEmbedding.First().Vector.ToArray(); var filter = new Qdrant.Client.Grpc.Filter(); filter.Should.Add(new Qdrant.Client.Grpc.Condition { Field = new Qdrant.Client.Grpc.FieldCondition { Key = "tenantId", Match = new Qdrant.Client.Grpc.Match { Text = tenantId } } }); filter.Should.Add(new Qdrant.Client.Grpc.Condition { Field = new Qdrant.Client.Grpc.FieldCondition { Key = "tenantId", Match = new Qdrant.Client.Grpc.Match { Text = "global" } } }); List searchResult; try { await EnsureCollectionExistsAsync("knowledge_units", cancellationToken); var response = await _qdrantClient.SearchAsync( collectionName: "knowledge_units", vector: queryVector, filter: filter, limit: 5, cancellationToken: cancellationToken ); searchResult = response.ToList(); } catch (Exception ex) { _logger.LogWarning(ex, "[KnowledgeService] Qdrant search failed during GetRelevantContextAsync. Returning empty search results."); searchResult = new List(); } var contexts = searchResult.Select(point => { var content = point.Payload.TryGetValue("content", out var cv) ? cv.StringValue : string.Empty; var summary = string.Empty; if (point.Payload.TryGetValue("metadataJson", out var metaVal) && !string.IsNullOrEmpty(metaVal.StringValue)) { try { var meta = JsonSerializer.Deserialize>(metaVal.StringValue); if (meta != null && meta.TryGetValue("summary", out var sumObj)) { summary = sumObj?.ToString(); } } catch (JsonException ex) { _logger.LogWarning(ex, "[KnowledgeService] Failed to deserialize metadata JSON in RelevantContext mapping."); } } var text = string.IsNullOrEmpty(summary) ? content : $"{content}: {summary}"; return new RelevantContext { Text = text, Confidence = point.Score }; }).ToList(); return Result.Ok(contexts); } catch (Exception ex) { return Result.Fail(new Error("Failed to retrieve relevant context").CausedBy(ex)); } } public async Task>> SearchLibrarySemanticallyAsync(string queryText, string tenantId, int limit, CancellationToken cancellationToken = default) { try { // 1. Generate 768-dimensional embedding var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct => await _embeddingGenerator.GenerateAsync( new[] { queryText }, new EmbeddingGenerationOptions { Dimensions = 768 }, cancellationToken: ct), cancellationToken); var queryVector = embeddingResponse.First().Vector.ToArray(); // 2. Query Qdrant var filter = new Qdrant.Client.Grpc.Filter(); filter.Should.Add(new Qdrant.Client.Grpc.Condition { Field = new Qdrant.Client.Grpc.FieldCondition { Key = "tenantId", Match = new Qdrant.Client.Grpc.Match { Text = tenantId } } }); filter.Should.Add(new Qdrant.Client.Grpc.Condition { Field = new Qdrant.Client.Grpc.FieldCondition { Key = "tenantId", Match = new Qdrant.Client.Grpc.Match { Text = "global" } } }); List searchResult; try { await EnsureCollectionExistsAsync("knowledge_units", cancellationToken); var response = await _qdrantClient.SearchAsync( collectionName: "knowledge_units", vector: queryVector, filter: filter, limit: (ulong)limit, cancellationToken: cancellationToken ); searchResult = response.ToList(); } catch (Exception ex) { _logger.LogWarning(ex, "[KnowledgeService] Failed to search in Qdrant; collection might not exist yet."); searchResult = new List(); } if (!searchResult.Any()) { return Result.Ok(new List()); } // 3. Graph Expansion via Neo4j var candidateIds = searchResult.Select(r => GetPointIdString(r.Id)).ToList(); var definitions = new Dictionary>(); if (candidateIds.Any()) { try { await using var session = _neo4jDriver.AsyncSession(); var cypher = @" MATCH (source:KnowledgeUnit)-[r]->(target:KnowledgeUnit) WHERE source.id IN $candidateIds RETURN source.id AS sourceId, target.content AS targetContent"; var neoResult = await session.ExecuteReadAsync(async tx => { var cursor = await tx.RunAsync(cypher, new { candidateIds }); return await cursor.ToListAsync(); }); foreach (var record in neoResult) { var sourceId = record["sourceId"].As(); var targetContent = record["targetContent"].As(); if (!definitions.ContainsKey(sourceId)) { definitions[sourceId] = new List(); } definitions[sourceId].Add(targetContent); } } catch (Exception ex) { _logger.LogWarning(ex, "[KnowledgeService] Neo4j graph expansion query failed."); } } // 4. Retrieve Ebook Titles from PostgreSQL var ebookIds = searchResult .Where(r => r.Payload.TryGetValue("ebookId", out var ev) && Guid.TryParse(ev.StringValue, out _)) .Select(r => Guid.Parse(r.Payload["ebookId"].StringValue)) .Distinct() .ToList(); var ebookTitles = new Dictionary(); if (ebookIds.Any()) { using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken); ebookTitles = await dbContext.Ebooks .Where(e => ebookIds.Contains(e.Id)) .ToDictionaryAsync(e => e.Id, e => e.Title, cancellationToken); } // 5. Map results to DTOs var dtos = searchResult.Select(point => { var content = point.Payload.TryGetValue("content", out var cv) ? cv.StringValue : string.Empty; var type = point.Payload.TryGetValue("type", out var tv) ? tv.StringValue : string.Empty; var ebookIdStr = point.Payload.TryGetValue("ebookId", out var ev) ? ev.StringValue : null; Guid? ebookId = null; if (Guid.TryParse(ebookIdStr, out var parsedId)) { ebookId = parsedId; } string? bookTitle = null; if (ebookId.HasValue) { ebookTitles.TryGetValue(ebookId.Value, out bookTitle); } Dictionary? metadata = null; if (point.Payload.TryGetValue("metadataJson", out var metaVal) && !string.IsNullOrEmpty(metaVal.StringValue)) { try { metadata = JsonSerializer.Deserialize>(metaVal.StringValue); } catch (JsonException ex) { _logger.LogWarning(ex, "[KnowledgeService] Failed to deserialize metadata JSON in search library mapping."); } } var dto = new SemanticSearchResultDto { ContentHash = GetPointIdString(point.Id), Snippet = content, UnitType = type, RelevanceScore = point.Score, SourceBookTitle = bookTitle, Metadata = metadata }; var pointIdStr = GetPointIdString(point.Id); if (definitions.TryGetValue(pointIdStr, out var pointDefs) && pointDefs.Any()) { dto.Snippet = $"[Context: {string.Join("; ", pointDefs)}]\n{dto.Snippet}"; } return dto; }).ToList(); return Result.Ok(dtos); } catch (Exception ex) { return Result.Fail(new Error("Failed to search library semantically").CausedBy(ex)); } } public async Task> AskQuestionAsync( string question, string tenantId, Guid? ebookId = null, int limit = 5, CancellationToken cancellationToken = default) { try { // 1. Generate 768-dimensional embedding for the question var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct => await _embeddingGenerator.GenerateAsync( new[] { question }, new EmbeddingGenerationOptions { Dimensions = 768 }, cancellationToken: ct), cancellationToken); var queryVector = embeddingResponse.First().Vector.ToArray(); // 2. Query Qdrant with filters var filter = new Qdrant.Client.Grpc.Filter(); // Tenant filter (must match tenantId OR "global") var tenantFilter = new Qdrant.Client.Grpc.Filter(); tenantFilter.Should.Add(new Qdrant.Client.Grpc.Condition { Field = new Qdrant.Client.Grpc.FieldCondition { Key = "tenantId", Match = new Qdrant.Client.Grpc.Match { Text = tenantId } } }); tenantFilter.Should.Add(new Qdrant.Client.Grpc.Condition { Field = new Qdrant.Client.Grpc.FieldCondition { Key = "tenantId", Match = new Qdrant.Client.Grpc.Match { Text = "global" } } }); filter.Must.Add(new Qdrant.Client.Grpc.Condition { Filter = tenantFilter }); if (ebookId.HasValue) { filter.Must.Add(new Qdrant.Client.Grpc.Condition { Field = new Qdrant.Client.Grpc.FieldCondition { Key = "ebookId", Match = new Qdrant.Client.Grpc.Match { Text = ebookId.Value.ToString() } } }); } List searchResult; try { await EnsureCollectionExistsAsync("knowledge_units", cancellationToken); var response = await _qdrantClient.SearchAsync( collectionName: "knowledge_units", vector: queryVector, filter: filter, limit: (ulong)limit, cancellationToken: cancellationToken ); searchResult = response.ToList(); } catch (Exception ex) { _logger.LogWarning(ex, "[KnowledgeService] Qdrant search failed during RAG retrieval."); searchResult = new List(); } if (!searchResult.Any()) { return Result.Ok(new GroundedResponseDto { Answer = "I cannot answer this based on the provided book context.", Citations = new List() }); } // 3. Graph Expansion via Neo4j var candidateIds = searchResult.Select(r => GetPointIdString(r.Id)).ToList(); var relatedContexts = new List(); // Keep map of point ID -> payload data for fast mapping later var pointMap = searchResult.ToDictionary(r => GetPointIdString(r.Id), r => r); // Fetch knowledge units from PostgreSQL to map Guids back to rich metadata summaries var guidMap = new Dictionary(); try { using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken); var units = await dbContext.KnowledgeUnits .Include(u => u.Ebook) .ThenInclude(e => e.Author) .Where(u => u.TenantId == tenantId && (ebookId == null || u.EbookId == ebookId)) .ToListAsync(cancellationToken); guidMap = units.ToDictionary(u => GetDeterministicGuid(u.Id).ToString(), u => u); } catch (Exception ex) { _logger.LogWarning(ex, "[KnowledgeService] Failed to load KnowledgeUnits from PostgreSQL for Guid mapping."); } if (candidateIds.Any()) { try { await using var session = _neo4jDriver.AsyncSession(); var cypher = @" MATCH (source:KnowledgeUnit) WHERE source.id IN $candidateIds OPTIONAL MATCH (source)-[r]->(target:KnowledgeUnit) RETURN source.id AS sourceId, source.content AS sourceContent, collect({ targetId: target.id, targetContent: target.content, relation: type(r) }) AS relations"; var neoResult = await session.ExecuteReadAsync(async tx => { var cursor = await tx.RunAsync(cypher, new { candidateIds }); return await cursor.ToListAsync(); }); foreach (var record in neoResult) { var sourceId = record["sourceId"].As(); var sourceText = string.Empty; if (guidMap.TryGetValue(sourceId, out var sourceUnit)) { var summary = string.Empty; if (!string.IsNullOrEmpty(sourceUnit.MetadataJson)) { try { var meta = JsonSerializer.Deserialize>(sourceUnit.MetadataJson); if (meta != null && meta.TryGetValue("summary", out var sumObj)) { summary = sumObj?.ToString(); } } catch (JsonException jsonEx) { _logger.LogWarning(jsonEx, "[KnowledgeService] Failed to deserialize metadata JSON for unit {UnitId} in AskQuestionAsync source hydration.", sourceUnit.Id); } } sourceText = string.IsNullOrEmpty(summary) ? sourceUnit.Content : $"{sourceUnit.Content}: {summary}"; } else { sourceText = record["sourceContent"].As(); } relatedContexts.Add($"[Source ID: {sourceId}] {sourceText}"); var relations = record["relations"].As>(); if (relations != null) { foreach (var relObj in relations) { if (relObj is System.Collections.IDictionary relDict) { var targetId = relDict["targetId"]?.ToString(); var targetContent = relDict["targetContent"]?.ToString(); var relation = relDict["relation"]?.ToString(); if (!string.IsNullOrEmpty(targetContent) && !string.IsNullOrEmpty(relation)) { var targetText = targetContent; if (!string.IsNullOrEmpty(targetId) && guidMap.TryGetValue(targetId, out var targetUnit)) { var summary = string.Empty; if (!string.IsNullOrEmpty(targetUnit.MetadataJson)) { try { var meta = JsonSerializer.Deserialize>(targetUnit.MetadataJson); if (meta != null && meta.TryGetValue("summary", out var sumObj)) { summary = sumObj?.ToString(); } } catch (JsonException jsonEx) { _logger.LogWarning(jsonEx, "[KnowledgeService] Failed to deserialize metadata JSON for unit {UnitId} in AskQuestionAsync target hydration.", targetUnit.Id); } } targetText = string.IsNullOrEmpty(summary) ? targetUnit.Content : $"{targetUnit.Content}: {summary}"; } relatedContexts.Add($"[Related Context ({relation}) to {sourceId}] {targetText}"); } } } } } } catch (Exception ex) { _logger.LogWarning(ex, "[KnowledgeService] Neo4j graph expansion failed. Falling back to direct Qdrant points."); foreach (var point in searchResult) { var sourceId = GetPointIdString(point.Id); var sourceText = string.Empty; if (guidMap.TryGetValue(sourceId, out var sourceUnit)) { var summary = string.Empty; if (!string.IsNullOrEmpty(sourceUnit.MetadataJson)) { try { var meta = JsonSerializer.Deserialize>(sourceUnit.MetadataJson); if (meta != null && meta.TryGetValue("summary", out var sumObj)) { summary = sumObj?.ToString(); } } catch (JsonException jsonEx) { _logger.LogWarning(jsonEx, "[KnowledgeService] Failed to deserialize metadata JSON for unit {UnitId} in fallback AskQuestionAsync.", sourceUnit.Id); } } sourceText = string.IsNullOrEmpty(summary) ? sourceUnit.Content : $"{sourceUnit.Content}: {summary}"; } else { sourceText = point.Payload.TryGetValue("content", out var cv) ? cv.StringValue : string.Empty; } relatedContexts.Add($"[Source ID: {sourceId}] {sourceText}"); } } } // 4. Retrieve Book Titles from PostgreSQL to populate citations var ebookIds = searchResult .Where(r => r.Payload.TryGetValue("ebookId", out var ev) && Guid.TryParse(ev.StringValue, out _)) .Select(r => Guid.Parse(r.Payload["ebookId"].StringValue)) .Distinct() .ToList(); var ebookTitles = new Dictionary(); if (ebookIds.Any()) { using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken); ebookTitles = await dbContext.Ebooks .Where(e => ebookIds.Contains(e.Id)) .ToDictionaryAsync(e => e.Id, e => e.Title, cancellationToken); } // 5. Build prompt and invoke Gemini with structured JSON formatting var contextBlocksText = string.Join("\n\n", relatedContexts); var systemPrompt = PromptRegistry.GroundedRAGSystemPrompt; var userPrompt = $"Context:\n{contextBlocksText}\n\nQuestion: {question}"; var options = new ChatOptions { Temperature = 0.0f, MaxOutputTokens = 1500 }; var chatResponse = await _retryPipeline.ExecuteAsync(async ct => await _chatClient.GetResponseAsync(new List { new ChatMessage(ChatRole.System, systemPrompt), new ChatMessage(ChatRole.User, userPrompt) }, options, cancellationToken: ct), cancellationToken); var rawJson = chatResponse.Text?.Trim() ?? string.Empty; rawJson = rawJson.Replace("```json", "").Replace("```", "").Trim(); // Handle direct text fallback when model bypasses JSON format if (!rawJson.StartsWith("{") && (rawJson.Contains("cannot answer", StringComparison.OrdinalIgnoreCase) || rawJson.Contains("context does not contain", StringComparison.OrdinalIgnoreCase) || rawJson.Contains("provided book context", StringComparison.OrdinalIgnoreCase))) { return Result.Ok(new GroundedResponseDto { Answer = "I cannot answer this based on the provided book context.", Citations = new List() }); } rawJson = JsonRepairHelper.Repair(rawJson); try { var groundedResult = JsonSerializer.Deserialize(rawJson, JsonOptions); if (groundedResult == null || string.IsNullOrWhiteSpace(groundedResult.Answer)) { return Result.Fail("Failed to deserialize grounded RAG response."); } // Hydrate book titles, author, and page number for citations if unknown foreach (var citation in groundedResult.Citations) { if (pointMap.TryGetValue(citation.CitationId, out var point) && point.Payload.TryGetValue("ebookId", out var ev) && Guid.TryParse(ev.StringValue, out var ebId)) { if (ebookTitles.TryGetValue(ebId, out var title)) { citation.SourceBook = title; } } // Look up from guidMap to get exact page number and author if (guidMap.TryGetValue(citation.CitationId, out var unit)) { if (unit.Ebook?.Author != null) { citation.Author = unit.Ebook.Author.Name; } if (!string.IsNullOrEmpty(unit.MetadataJson)) { try { var meta = JsonSerializer.Deserialize>(unit.MetadataJson); if (meta != null && meta.TryGetValue("page", out var pageObj) && int.TryParse(pageObj?.ToString(), out var pageVal)) { citation.PageNumber = pageVal; } } catch (JsonException ex) { _logger.LogWarning(ex, "[KnowledgeService] Failed to deserialize metadata JSON for unit {UnitId} in AskQuestionAsync citation mapping.", unit.Id); } } } } return Result.Ok(groundedResult); } catch (JsonException ex) { _logger.LogError(ex, "[KnowledgeService] JSON deserialization failed for grounding response. Raw text: {Text}", rawJson); return Result.Fail($"Failed to parse AI grounded response: {ex.Message}"); } } catch (Exception ex) { return Result.Fail(new Error("Failed to execute RAG retrieval flow").CausedBy(ex)); } } public async Task ClearCacheAsync(CancellationToken cancellationToken = default) { using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken); try { await dbContext.SemanticKnowledgeCache.ExecuteDeleteAsync(cancellationToken); await dbContext.KnowledgeUnits.ExecuteDeleteAsync(cancellationToken); await dbContext.KnowledgeUnitLinks.ExecuteDeleteAsync(cancellationToken); try { await _qdrantClient.DeleteCollectionAsync("knowledge_units", cancellationToken: cancellationToken); } catch (Exception ex) { _logger.LogWarning(ex, "[KnowledgeService] Failed to drop Qdrant collection 'knowledge_units' during cache clear."); } try { await using var session = _neo4jDriver.AsyncSession(); await session.ExecuteWriteAsync(async tx => { await tx.RunAsync("MATCH (n:KnowledgeUnit) DETACH DELETE n"); }); _logger.LogInformation("[KnowledgeService] Successfully wiped Neo4j 'KnowledgeUnit' nodes."); } catch (Exception ex) { _logger.LogWarning(ex, "[KnowledgeService] Failed to wipe Neo4j graph during cache clear."); } return Result.Ok(); } catch (Exception ex) { return Result.Fail(new Error("Failed to clear knowledge cache").CausedBy(ex)); } } private int EstimateTokenCount(string text) { if (string.IsNullOrEmpty(text)) return 0; return _tokenizer.CountTokens(text); } }