feat: normalize subscription architecture, integrate pgvector, and implement Stripe webhook subscription management.
This commit is contained in:
@@ -12,6 +12,7 @@ using Polly;
|
||||
using Polly.Registry;
|
||||
using Microsoft.Extensions.Options;
|
||||
using NexusReader.Infrastructure.Configuration;
|
||||
using Pgvector;
|
||||
using Pgvector.EntityFrameworkCore;
|
||||
|
||||
namespace NexusReader.Infrastructure.Services;
|
||||
@@ -20,7 +21,7 @@ public class KnowledgeService : IKnowledgeService
|
||||
{
|
||||
private readonly IChatClient _chatClient;
|
||||
private readonly IEmbeddingGenerator<string, Embedding<float>> _embeddingGenerator;
|
||||
private readonly AppDbContext _dbContext;
|
||||
private readonly IDbContextFactory<AppDbContext> _dbContextFactory;
|
||||
private readonly ResiliencePipeline _retryPipeline;
|
||||
private readonly AiSettings _settings;
|
||||
private readonly Tokenizer _tokenizer;
|
||||
@@ -29,13 +30,13 @@ public class KnowledgeService : IKnowledgeService
|
||||
public KnowledgeService(
|
||||
IChatClient chatClient,
|
||||
IEmbeddingGenerator<string, Embedding<float>> embeddingGenerator,
|
||||
AppDbContext dbContext,
|
||||
IDbContextFactory<AppDbContext> dbContextFactory,
|
||||
ResiliencePipelineProvider<string> pipelineProvider,
|
||||
IOptions<AiSettings> settings)
|
||||
{
|
||||
_chatClient = chatClient;
|
||||
_embeddingGenerator = embeddingGenerator;
|
||||
_dbContext = dbContext;
|
||||
_dbContextFactory = dbContextFactory;
|
||||
_retryPipeline = pipelineProvider.GetPipeline("ai-retry");
|
||||
_settings = settings.Value;
|
||||
// Use Tiktoken (cl100k_base) which is a standard for modern LLMs and provides
|
||||
@@ -63,40 +64,30 @@ public class KnowledgeService : IKnowledgeService
|
||||
return await GetKnowledgeInternalAsync(text, tenantId, PromptRegistry.KM_ExtractionPrompt, "km_map", cancellationToken);
|
||||
}
|
||||
|
||||
private async Task<Result<KnowledgePacket>> GetKnowledgeInternalAsync(string text, string tenantId, string systemPrompt, string cacheSuffix, CancellationToken cancellationToken)
|
||||
private async Task<Result<KnowledgePacket>> GetKnowledgeInternalAsync(string text, string tenantId, string systemPrompt, string traceType, CancellationToken cancellationToken)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(text))
|
||||
{
|
||||
return Result.Fail("Input text is empty.");
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(text)) return Result.Fail("Input text is empty.");
|
||||
|
||||
Console.WriteLine($"[KnowledgeService] Starting extraction ({cacheSuffix}) for text sample: {text.Substring(0, Math.Min(text.Length, 50))}...");
|
||||
|
||||
var normalizedText = ContentHasher.Normalize(text);
|
||||
|
||||
var tokenCount = EstimateTokenCount(normalizedText);
|
||||
if (tokenCount > _settings.MaxInputTokens)
|
||||
{
|
||||
return Result.Fail($"Input exceeds maximum token limit. Estimated tokens: {tokenCount}, limit: {_settings.MaxInputTokens}.");
|
||||
}
|
||||
|
||||
var hash = ContentHasher.ComputeHash(normalizedText) + "_" + cacheSuffix;
|
||||
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
|
||||
var normalizedText = text.Trim();
|
||||
var hash = ContentHasher.ComputeHash(normalizedText);
|
||||
|
||||
// 1. Check Cache
|
||||
var cached = await _dbContext.SemanticKnowledgeCache
|
||||
.FirstOrDefaultAsync(c => c.ContentHash == hash && c.TenantId == tenantId && c.PromptVersion == PromptVersion, cancellationToken);
|
||||
|
||||
if (cached != null)
|
||||
var cached = await dbContext.SemanticKnowledgeCache
|
||||
.FirstOrDefaultAsync(c => c.ContentHash == hash && c.TenantId == tenantId, cancellationToken);
|
||||
|
||||
if (cached != null && cached.PromptVersion == PromptVersion)
|
||||
{
|
||||
Console.WriteLine($"[KnowledgeService] Cache Hit for {traceType} ({hash})");
|
||||
try
|
||||
{
|
||||
var packet = JsonSerializer.Deserialize<KnowledgePacket>(cached.JsonData, new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
|
||||
if (packet != null) return Result.Ok(packet);
|
||||
}
|
||||
catch { }
|
||||
catch { /* fallback to regen */ }
|
||||
}
|
||||
|
||||
// 2. Call AI Client
|
||||
Console.WriteLine($"[KnowledgeService] Cache Miss for {traceType} ({hash}). Requesting AI...");
|
||||
try
|
||||
{
|
||||
var options = new ChatOptions
|
||||
@@ -147,26 +138,23 @@ public class KnowledgeService : IKnowledgeService
|
||||
ModelId = _settings.Model,
|
||||
PromptVersion = PromptVersion,
|
||||
TenantId = tenantId,
|
||||
Vector = vector,
|
||||
Vector = vector != null ? new Vector(vector) : null,
|
||||
CreatedAt = DateTime.UtcNow
|
||||
};
|
||||
|
||||
if (cached == null) _dbContext.SemanticKnowledgeCache.Add(cacheEntry);
|
||||
if (cached == null) dbContext.SemanticKnowledgeCache.Add(cacheEntry);
|
||||
else
|
||||
{
|
||||
cached.JsonData = jsonResponse;
|
||||
cached.OriginalText = normalizedText;
|
||||
cached.Vector = vector;
|
||||
cached.Vector = vector != null ? new Vector(vector) : null;
|
||||
cached.CreatedAt = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
// 5. Process KM-RAG Units and Links if present
|
||||
if (knowledgePacket.Units.Any())
|
||||
{
|
||||
await ProcessKnowledgeUnitsAsync(knowledgePacket, tenantId, cancellationToken);
|
||||
}
|
||||
// 5. Process structured KnowledgeUnits (Graph Expansion)
|
||||
await ProcessKnowledgeUnitsAsync(knowledgePacket, tenantId, dbContext, cancellationToken);
|
||||
|
||||
await _dbContext.SaveChangesAsync(cancellationToken);
|
||||
await dbContext.SaveChangesAsync(cancellationToken);
|
||||
return Result.Ok(knowledgePacket);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
@@ -181,39 +169,70 @@ public class KnowledgeService : IKnowledgeService
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessKnowledgeUnitsAsync(KnowledgePacket packet, string tenantId, CancellationToken cancellationToken)
|
||||
private async Task ProcessKnowledgeUnitsAsync(KnowledgePacket packet, string tenantId, AppDbContext dbContext, CancellationToken cancellationToken)
|
||||
{
|
||||
var unitIds = packet.Units.Select(u => u.Id).ToList();
|
||||
var linkSourceIds = packet.Links.Select(l => l.Source).ToList();
|
||||
var linkTargetIds = packet.Links.Select(l => l.Target).ToList();
|
||||
|
||||
var allCandidateIds = unitIds.Concat(linkSourceIds).Concat(linkTargetIds).Distinct().ToList();
|
||||
|
||||
// Single batch query to find existing units
|
||||
var existingUnits = await dbContext.KnowledgeUnits
|
||||
.Where(u => allCandidateIds.Contains(u.Id))
|
||||
.ToDictionaryAsync(u => u.Id, cancellationToken);
|
||||
|
||||
var processedUnitIds = new HashSet<string>();
|
||||
|
||||
foreach (var unitDto in packet.Units)
|
||||
{
|
||||
var unitId = unitDto.Id;
|
||||
var existing = await _dbContext.KnowledgeUnits.FindAsync(new object[] { unitId }, cancellationToken);
|
||||
existingUnits.TryGetValue(unitId, out var unit);
|
||||
|
||||
if (unit == null)
|
||||
{
|
||||
unit = new KnowledgeUnit { Id = unitId, TenantId = tenantId };
|
||||
dbContext.KnowledgeUnits.Add(unit);
|
||||
existingUnits[unitId] = unit;
|
||||
}
|
||||
|
||||
var unit = existing ?? new KnowledgeUnit { Id = unitId, TenantId = tenantId };
|
||||
unit.Type = Enum.TryParse<NexusReader.Domain.Enums.KnowledgeUnitType>(unitDto.Type, true, out var type) ? type : NexusReader.Domain.Enums.KnowledgeUnitType.Snippet;
|
||||
unit.Content = unitDto.Content;
|
||||
unit.SourceId = "extracted";
|
||||
unit.MetadataJson = JsonSerializer.Serialize(unitDto.Metadata);
|
||||
|
||||
// Generate unit-specific embedding for granular retrieval
|
||||
try
|
||||
{
|
||||
var emb = await _embeddingGenerator.GenerateAsync(new[] { unit.Content }, cancellationToken: cancellationToken);
|
||||
unit.Vector = emb.First().Vector.ToArray();
|
||||
var emb = await _retryPipeline.ExecuteAsync(async ct =>
|
||||
await _embeddingGenerator.GenerateAsync(new[] { unit.Content }, cancellationToken: ct), cancellationToken);
|
||||
unit.Vector = new Vector(emb.First().Vector.ToArray());
|
||||
}
|
||||
catch { /* Ignore embedding errors for now */ }
|
||||
|
||||
if (existing == null) _dbContext.KnowledgeUnits.Add(unit);
|
||||
processedUnitIds.Add(unit.Id);
|
||||
}
|
||||
|
||||
foreach (var linkDto in packet.Links)
|
||||
{
|
||||
var link = new KnowledgeUnitLink
|
||||
var sourceExists = processedUnitIds.Contains(linkDto.Source) || existingUnits.ContainsKey(linkDto.Source);
|
||||
var targetExists = processedUnitIds.Contains(linkDto.Target) || existingUnits.ContainsKey(linkDto.Target);
|
||||
|
||||
if (sourceExists && targetExists)
|
||||
{
|
||||
SourceUnitId = linkDto.Source,
|
||||
TargetUnitId = linkDto.Target,
|
||||
RelationType = linkDto.Relation
|
||||
};
|
||||
_dbContext.KnowledgeUnitLinks.Add(link);
|
||||
// Check if link already exists to avoid duplicates if necessary
|
||||
// For now, assume we can add them or they are new in this session
|
||||
var link = new KnowledgeUnitLink
|
||||
{
|
||||
SourceUnitId = linkDto.Source,
|
||||
TargetUnitId = linkDto.Target,
|
||||
RelationType = linkDto.Relation
|
||||
};
|
||||
dbContext.KnowledgeUnitLinks.Add(link);
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.WriteLine($"[KnowledgeService] WARNING: Skipping invalid link {linkDto.Source} -> {linkDto.Target} (Missing units).");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,30 +276,21 @@ public class KnowledgeService : IKnowledgeService
|
||||
|
||||
public async Task<Result<List<RelevantContext>>> GetRelevantContextAsync(string query, string tenantId, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(query)) return Result.Fail("Query is empty.");
|
||||
|
||||
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
// 1. Generate embedding for query
|
||||
var embeddingResponse = await _retryPipeline.ExecuteAsync(async ct =>
|
||||
var queryEmbedding = await _retryPipeline.ExecuteAsync(async ct =>
|
||||
await _embeddingGenerator.GenerateAsync(new[] { query }, cancellationToken: ct), cancellationToken);
|
||||
var queryVector = embeddingResponse.First().Vector.ToArray();
|
||||
var queryVector = new Vector(queryEmbedding.First().Vector.ToArray());
|
||||
|
||||
// 2. Search using pgvector
|
||||
var results = await _dbContext.SemanticKnowledgeCache
|
||||
.AsNoTracking()
|
||||
.Where(x => (x.TenantId == tenantId || x.TenantId == "global") && x.Vector != null)
|
||||
.OrderBy(x => x.Vector!.CosineDistance(queryVector))
|
||||
var relevantUnits = await dbContext.KnowledgeUnits
|
||||
.Where(u => u.TenantId == tenantId)
|
||||
.OrderBy(u => u.Vector!.L2Distance(queryVector))
|
||||
.Take(5)
|
||||
.Select(x => new RelevantContext
|
||||
{
|
||||
Text = x.OriginalText,
|
||||
SourceId = x.ContentHash,
|
||||
Confidence = 1 - x.Vector!.CosineDistance(queryVector)
|
||||
})
|
||||
.Select(u => new RelevantContext { Text = u.Content, Confidence = 1.0 })
|
||||
.ToListAsync(cancellationToken);
|
||||
|
||||
return Result.Ok(results);
|
||||
return Result.Ok(relevantUnits);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -290,16 +300,17 @@ public class KnowledgeService : IKnowledgeService
|
||||
|
||||
public async Task<Result> ClearCacheAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var dbContext = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
Console.WriteLine("[KnowledgeService] Clearing SemanticKnowledgeCache...");
|
||||
_dbContext.SemanticKnowledgeCache.RemoveRange(_dbContext.SemanticKnowledgeCache);
|
||||
await _dbContext.SaveChangesAsync(cancellationToken);
|
||||
await dbContext.SemanticKnowledgeCache.ExecuteDeleteAsync(cancellationToken);
|
||||
await dbContext.KnowledgeUnits.ExecuteDeleteAsync(cancellationToken);
|
||||
await dbContext.KnowledgeUnitLinks.ExecuteDeleteAsync(cancellationToken);
|
||||
return Result.Ok();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return Result.Fail($"Failed to clear cache: {ex.Message}");
|
||||
return Result.Fail(new Error("Failed to clear knowledge cache").CausedBy(ex));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user