Skip to content

Conversation

@morgabra
Copy link
Contributor

@morgabra morgabra commented Aug 22, 2025

Spike/Experiment to see what a storage engine interface would look like and try out a different implementation.

This would be a multi-phased PR in practice, but just wanted to start on a vibe check about the kind of performance we could get out of different storage engines.

  • Intoduce a new c1z format
  • Put the current sqlite functionality behind an interface
  • Add a new implementation trying out 'pebble'

TODO

  • Tuning the engine at all for perf
  • Profile and see what's going on
  • Actually run a real sync and profile/test that
  • This tried to not break the existing interfaces/API - but it's not particularly clean so the StorageBackend interface is really messy/leaky.
  • Cleaning up old syncs loses pretty bad to sqlite (even though they both compact/vacuum), but you get to RangeDelete with pebble, which is a single tombstone, so it's really fast without the compaction. Maybe there are tunables here.

With no attempt at actual performance tuning:

=== RUN   TestEngineComparison
🚀 Engine Comparison Benchmark Suite
📁 Results will be saved to: engine_comparison_results.json
📏 Scale: large

🏗️ Running large scale benchmarks...

⏱️  Total runtime: 40.14875825s
💾 Detailed results saved to: engine_comparison_results.json

================================================================================
🏁 ENGINE COMPARISON SUMMARY
================================================================================
📊 Total Tests: 10
🥇 Pebble Wins: 8
🥈 SQLite Wins: 2
⚡ Average Speedup Ratio: 606.90% (positive = Pebble faster)
🔢 Average Alloc Ratio: 202.85% (positive = Pebble uses more per op)
🚀 Pebble Avg Ops/Sec: 17445258.93
🗃️  SQLite Avg Ops/Sec: 742395735.11
🏆 Recommended Engine: pebble
💡 Reason: Pebble won 8/10 tests with 606.90% better average performance

================================================================================
📈 DETAILED RESULTS
================================================================================
🧪 RawInserts_100000:
   Pebble: 1031308.82 ops/sec, 9.01 allocs/op
   SQLite: 91485.92 ops/sec, 53.46 allocs/op
   Winner: pebble (1027.29% faster)
   Allocs/Op Diff: -83.14% (positive = Pebble uses more)

🧪 SequentialReads_setup_100000_read_10000:
   Pebble: 20207123.01 ops/sec, 1.31 allocs/op
   SQLite: 1766134.48 ops/sec, 13.00 allocs/op
   Winner: pebble (1044.14% faster)
   Allocs/Op Diff: -89.95% (positive = Pebble uses more)

🧪 RandomReads_setup_100000_read_10000:
   Pebble: 299948.64 ops/sec, 48.02 allocs/op
   SQLite: 38254.72 ops/sec, 355.00 allocs/op
   Winner: pebble (684.08% faster)
   Allocs/Op Diff: -86.47% (positive = Pebble uses more)

🧪 UpsertIfNewer_setup_100000_upsert_10000:
   Pebble: 232941.63 ops/sec, 9.03 allocs/op
   SQLite: 124738.12 ops/sec, 53.50 allocs/op
   Winner: pebble (86.74% faster)
   Allocs/Op Diff: -83.13% (positive = Pebble uses more)

🧪 UpsertIfNewer_setup_500000_upsert_50000:
   Pebble: 228221.36 ops/sec, 9.03 allocs/op
   SQLite: 95094.34 ops/sec, 53.47 allocs/op
   Winner: pebble (139.99% faster)
   Allocs/Op Diff: -83.12% (positive = Pebble uses more)

🧪 CleanupSync_100000:
   Pebble: 925190.15 ops/sec, 0.01 allocs/op
   SQLite: 1836727197.03 ops/sec, 0.00 allocs/op
   Winner: sqlite (99.95% faster)
   Allocs/Op Diff: 747.85% (positive = Pebble uses more)

🧪 CleanupSync_500000:
   Pebble: 1273240.35 ops/sec, 0.01 allocs/op
   SQLite: 5576208178.44 ops/sec, 0.00 allocs/op
   Winner: sqlite (99.98% faster)
   Allocs/Op Diff: 1966.14% (positive = Pebble uses more)

🧪 RawInserts_500000:
   Pebble: 1077591.24 ops/sec, 9.00 allocs/op
   SQLite: 84341.46 ops/sec, 53.46 allocs/op
   Winner: pebble (1177.65% faster)
   Allocs/Op Diff: -83.16% (positive = Pebble uses more)

🧪 SequentialReads_setup_100000_read_50000:
   Pebble: 148938515.20 ops/sec, 0.26 allocs/op
   SQLite: 8783036.51 ops/sec, 2.60 allocs/op
   Winner: pebble (1595.75% faster)
   Allocs/Op Diff: -89.94% (positive = Pebble uses more)

🧪 RandomReads_setup_100000_read_25000:
   Pebble: 238508.84 ops/sec, 47.51 allocs/op
   SQLite: 38890.04 ops/sec, 355.00 allocs/op
   Winner: pebble (513.29% faster)
   Allocs/Op Diff: -86.62% (positive = Pebble uses more)

Summary by CodeRabbit

  • New Features

    • Introduces a Pebble-based storage backend with pluggable engine selection.
    • Adds key-based pagination, conditional upserts, and full CRUD for resources, entitlements, grants, and assets.
    • Provides sync lifecycle (start/checkpoint/end), stats, diff/clone, cleanup controls, and manual compaction.
    • Adds observability: metrics, health/system stats, and slow query logging.
  • Refactor

    • Replaces legacy C1File with storage engine interfaces; SQLite APIs moved under a dedicated package.
  • Documentation

    • Adds design, requirements, tasks, and a Pebble vs SQLite benchmark guide.
  • Tests

    • Extensive unit, integration, and performance benchmarks; cross-engine comparison with sample JSON results.
  • Chores

    • Adds dependencies to support Pebble and observability.

@coderabbitai
Copy link

coderabbitai bot commented Aug 22, 2025

I'll provide a comprehensive overview of the changes with the requested sections.

Walkthrough

This pull request introduces a new Pebble-based storage engine for the Baton SDK, replacing the existing SQLite backend. The implementation provides a comprehensive, high-performance storage solution with enhanced key-value management, secondary indexing, sync lifecycle tracking, and observability features.

Changes

Cohort / File(s) Change Summary
Core Engine Design
.kiro/specs/pebble-storage-engine/*
Introduced comprehensive design documents outlining Pebble storage engine architecture, requirements, and implementation tasks.
Storage Engine Interface
pkg/dotc1z/engine/engine.go
Added StorageEngine and AttachedStorageEngine interfaces defining core storage backend capabilities.
Pebble Implementation
pkg/dotc1z/engine/pebble/*
Implemented full Pebble-based storage engine with:
- Key encoding
- Value serialization
- Resource management
- Sync lifecycle tracking
- Indexing
- Pagination
- Observability
Module Configuration
go.mod
Added Pebble and related dependencies.
SQLite Refactoring
pkg/dotc1z/engine/sqlite/*
Migrated SQLite implementations to use new storage engine interfaces.

Sequence Diagram

sequenceDiagram
    participant Client
    participant StorageEngine
    participant PebbleEngine
    participant KeyEncoder
    participant ValueCodec
    participant IndexManager

    Client->>StorageEngine: StartNewSync()
    StorageEngine->>PebbleEngine: Create Sync
    PebbleEngine->>KeyEncoder: Encode Sync Key
    PebbleEngine->>ValueCodec: Serialize Sync Metadata
    PebbleEngine->>IndexManager: Update Sync Indexes

    Client->>StorageEngine: PutResources()
    StorageEngine->>PebbleEngine: Store Resources
    PebbleEngine->>KeyEncoder: Generate Resource Keys
    PebbleEngine->>ValueCodec: Encode Resource Data
    PebbleEngine->>IndexManager: Create Secondary Indexes

    Client->>StorageEngine: EndSync()
    StorageEngine->>PebbleEngine: Finalize Sync
    PebbleEngine->>IndexManager: Commit Indexes
    PebbleEngine->>ValueCodec: Mark Sync Complete
Loading

Estimated Code Review Effort

🎯 5 (Critical) | ⏱️ ~120 minutes

The changes are extensive, touching multiple packages, introducing a new storage engine, and refactoring existing implementations. The high complexity stems from:

  • Multiple new packages and implementations
  • Complex key encoding and value serialization
  • Comprehensive indexing and sync management
  • Observability and performance considerations

Possibly Related PRs

Suggested Reviewers

  • jirwin
  • ggreer

Poem

🐰 A Pebble's Tale of Storage Grace

With keys encoded and data embraced,
A storage engine, swiftly traced,
Through sync and index, data flies,
Performance soars, efficiency rises!

Rabbit's code, a lexical dance! 🚀

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch morgabra/pebble

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@morgabra morgabra marked this pull request as draft August 22, 2025 21:14
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 39

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
pkg/dotc1z/engine/sqlite/assets.go (3)

57-67: Nil AssetRef guard to prevent panics.

PutAsset dereferences assetRef without a nil check. Add validation to avoid panics and return a clear error.

Apply this diff:

 func (c *SQLite) PutAsset(ctx context.Context, assetRef *v2.AssetRef, contentType string, data []byte) error {
   ctx, span := tracer.Start(ctx, "SQLite.PutAsset")
   defer span.End()

   l := ctxzap.Extract(ctx)

+  if assetRef == nil || assetRef.Id == "" {
+    return fmt.Errorf("asset ref and id are required")
+  }

78-85: Timestamp normalization and conflict updates for metadata.

  • discovered_at is formatted without timezone and may be in local time. Prefer UTC RFC3339Nano for consistency across platforms.
  • On conflict, only data is updated; content_type and discovered_at should also update to reflect the latest write.

Apply this diff:

   fields := goqu.Record{
     "external_id":   assetRef.Id,
     "content_type":  contentType,
     "data":          data,
     "sync_id":       c.currentSyncID,
-    "discovered_at": time.Now().Format("2006-01-02 15:04:05.999999999"),
+    "discovered_at": time.Now().UTC().Format(time.RFC3339Nano),
   }

   q := c.db.Insert(assets.Name()).Prepared(true)
   q = q.Rows(fields)
-  q = q.OnConflict(goqu.DoUpdate("external_id, sync_id", goqu.C("data").Set(goqu.I("EXCLUDED.data"))))
+  q = q.OnConflict(goqu.DoUpdate("external_id, sync_id", goqu.Record{
+    "data":          goqu.I("EXCLUDED.data"),
+    "content_type":  goqu.I("EXCLUDED.content_type"),
+    "discovered_at": goqu.I("EXCLUDED.discovered_at"),
+  }))

107-121: Stronger request validation in GetAsset.

Guard against nil/empty IDs to align with other validation patterns.

Apply this diff:

 func (c *SQLite) GetAsset(ctx context.Context, request *v2.AssetServiceGetAssetRequest) (string, io.Reader, error) {
   ctx, span := tracer.Start(ctx, "SQLite.GetAsset")
   defer span.End()

   err := c.validateDb(ctx)
   if err != nil {
     return "", nil, err
   }

-  if request.Asset == nil {
-    return "", nil, fmt.Errorf("asset is required")
-  }
+  if request == nil || request.Asset == nil || request.Asset.Id == "" {
+    return "", nil, fmt.Errorf("asset and id are required")
+  }
pkg/dotc1z/engine/sqlite/diff.go (1)

14-60: Make GenerateSyncDiff atomic with a transaction.

If any table insert fails mid-way, you’ll leave a partial diff sync (and possibly an open run if endSyncRun isn’t reached). Wrap the diff generation (insertSyncRun, per-table inserts, endSyncRun) in a single write transaction for atomicity, or implement a best-effort rollback on error.

I can propose a tx-wrapped variant if you confirm the project’s preferred tx helper (goqu.WithTx vs. custom).

pkg/dotc1z/engine/sqlite/sql_helpers_test.go (1)

21-31: Compile-time bug: invalid range over int in generateResources

for i := range count does not compile in Go. Range works over slices, maps, strings, channels, or arrays — not integers.

Apply one of the following diffs:

Option A (iterate by index):

- for i := range count {
+ for i := 0; i < count; i++ {

Option B (range over the pre-sized slice):

- for i := range count {
+ for i := range response {
pkg/dotc1z/engine/sqlite/grants.go (1)

137-163: Fix wrong request/response types for ListGrantsForPrincipal (compile-time break).

The method is declared with the Entitlement request/response types. This will not compile and/or will route the wrong RPC.

Apply this diff:

 func (c *SQLite) ListGrantsForPrincipal(
   ctx context.Context,
-  request *reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest,
-) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error) {
+  request *reader_v2.GrantsReaderServiceListGrantsForPrincipalRequest,
+) (*reader_v2.GrantsReaderServiceListGrantsForPrincipalResponse, error) {
   ctx, span := tracer.Start(ctx, "SQLite.ListGrantsForPrincipal")
   defer span.End()
 
   objs, nextPageToken, err := c.listConnectorObjects(ctx, grants.Name(), request)
   if err != nil {
     return nil, fmt.Errorf("error listing grants for principal '%s': %w", request.GetPrincipalId(), err)
   }
 
   ret := make([]*v2.Grant, 0, len(objs))
   for _, o := range objs {
     en := &v2.Grant{}
     err = proto.Unmarshal(o, en)
     if err != nil {
       return nil, err
     }
     ret = append(ret, en)
   }
 
-  return &reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse{
+  return &reader_v2.GrantsReaderServiceListGrantsForPrincipalResponse{
     List:          ret,
     NextPageToken: nextPageToken,
   }, nil
 }
pkg/dotc1z/engine/sqlite/sql_helpers.go (1)

201-216: Page-token math relies on integer IDs; make the predicate type-safe and resilient to gaps.

  • You compute nextPageToken = strconv.Itoa(lastRow + 1) and resume with id >= token. This does skip the over-fetched row, but it assumes numeric comparison. Ensure the predicate is numeric, not string, to avoid SQLite’s string coercion edge cases.

Minimal tweak: parse the token to int for Gte and keep the token as a string for transport.

- if listReq.GetPageToken() != "" {
-   q = q.Where(goqu.C("id").Gte(listReq.GetPageToken()))
- }
+ if listReq.GetPageToken() != "" {
+   tok, err := strconv.Atoi(listReq.GetPageToken())
+   if err != nil {
+     return nil, "", fmt.Errorf("invalid page token: %w", err)
+   }
+   q = q.Where(goqu.C("id").Gte(tok))
+ }

This keeps paging robust even if SQLite changes coercion behavior.

Also applies to: 259-265

github.com/aws/aws-sdk-go-v2/service/s3 v1.75.0
github.com/aws/aws-sdk-go-v2/service/sts v1.33.10
github.com/aws/smithy-go v1.22.2
github.com/cockroachdb/pebble v1.1.5
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Pebble dependency looks good; please verify latest patch and vuln status

Nice addition. Before landing, sanity-check latest patch availability and known advisories for Pebble and its CockroachDB companions.

Run:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show available updates for key modules
go list -m -u github.com/cockroachdb/pebble
go list -m -u github.com/cockroachdb/errors
go list -m -u github.com/cockroachdb/redact

# Tidy to confirm module graph is consistent
go mod tidy -v

# Vulnerability scan (includes tests)
go run golang.org/x/vuln/cmd/govulncheck@latest ./...

Length of output: 175


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show available updates for key CockroachDB modules, bypassing the vendor directory
go list -m -u -mod=mod github.com/cockroachdb/pebble
go list -m -u -mod=mod github.com/cockroachdb/errors
go list -m -u -mod=mod github.com/cockroachdb/redact

# Tidy module graph to confirm consistency
go mod tidy -mod=mod -v

# Run vulnerability scan
go run -mod=mod golang.org/x/vuln/cmd/govulncheck@latest ./...

Length of output: 487


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Ensure modules are enabled and bypass the vendor directory
export GO111MODULE=on

# Check for available updates for CockroachDB modules
go list -m -u github.com/cockroachdb/pebble
go list -m -u github.com/cockroachdb/errors
go list -m -u github.com/cockroachdb/redact

# Tidy the module graph to confirm consistency
go mod tidy -v

# Run vulnerability scan across the repo
go run golang.org/x/vuln/cmd/govulncheck@latest ./...

Length of output: 216


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Enable modules and bypass vendor directory for all commands
export GO111MODULE=on
export GOFLAGS="-mod=mod"

echo "=== Checking for module updates ==="
go list -m -u github.com/cockroachdb/pebble
go list -m -u github.com/cockroachdb/errors
go list -m -u github.com/cockroachdb/redact

echo "=== Tidying module graph ==="
go mod tidy -v

echo "=== Running vulnerability scan ==="
go run golang.org/x/vuln/cmd/govulncheck@latest ./...

Length of output: 15186


Action Required: Confirm Dependency and Go Runtime Updates

The Pebble module (v1.1.5) is already at its latest patch, but its CockroachDB companions have newer patch releases and there are several high-severity CVEs in the Go standard library affecting this codebase. Please address the following before landing:

  • Pebble remains at v1.1.5 (no upgrade available)
  • CockroachDB/errors: v1.11.3 → v1.12.0 (upgrade recommended)
  • CockroachDB/redact: v1.1.5 → v1.1.6 (upgrade recommended)
  • Standard Library CVEs detected by govulncheck (all present in go1.24.1):
    • GO-2025-3849 (database/sql)
    • GO-2025-3751 (net/http)
    • GO-2025-3750 (os/syscall)
    • GO-2025-3749 (crypto/x509)
    • GO-2025-3563 (net/http/internal)
      These are fixed in Go 1.24.4 and 1.24.6 respectively — please upgrade your Go toolchain in CI and local environments and re-run govulncheck.

Next steps:

  1. Bump the go.mod entries for github.com/cockroachdb/errors and github.com/cockroachdb/redact.
  2. Ensure CI and developer setups use Go ≥ 1.24.6.
  3. Re-run go mod tidy and go run golang.org/x/vuln/cmd/govulncheck@latest ./... to confirm no remaining advisories.
🤖 Prompt for AI Agents
In go.mod around line 16, the Pebble entry is fine but the CockroachDB companion
modules need version bumps and the Go runtime must be updated: change
github.com/cockroachdb/errors to v1.12.0 and github.com/cockroachdb/redact to
v1.1.6 in go.mod, update CI and developer toolchain to use Go >= 1.24.6, then
run go mod tidy and re-run govulncheck (golang.org/x/vuln/cmd/govulncheck@latest
./...) to verify the standard-library CVEs are resolved; if any indirect modules
require adjustments, run go get to pin versions before tidying and commit the
updated go.mod and go.sum.

Comment on lines +86 to +91
v2Opts := []v2.V2C1ZOption{v2.WithTmpDir(options.tmpDir), v2.WithEngine(options.engine)}
for _, p := range options.pragmas {
v2Opts = append(v2Opts, v2.WithPragma(p.name, p.value))
}

pageToken = resp.NextPageToken
return v2.NewV2C1ZFile(ctx, outputFilePath)
default:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

v2 options are built but not passed to NewV2C1ZFile

This drops WithTmpDir/WithEngine/WithPragma for V2.

-    return v2.NewV2C1ZFile(ctx, outputFilePath)
+    return v2.NewV2C1ZFile(ctx, outputFilePath, v2Opts...)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
v2Opts := []v2.V2C1ZOption{v2.WithTmpDir(options.tmpDir), v2.WithEngine(options.engine)}
for _, p := range options.pragmas {
v2Opts = append(v2Opts, v2.WithPragma(p.name, p.value))
}
pageToken = resp.NextPageToken
return v2.NewV2C1ZFile(ctx, outputFilePath)
default:
v2Opts := []v2.V2C1ZOption{v2.WithTmpDir(options.tmpDir), v2.WithEngine(options.engine)}
for _, p := range options.pragmas {
v2Opts = append(v2Opts, v2.WithPragma(p.name, p.value))
}
return v2.NewV2C1ZFile(ctx, outputFilePath, v2Opts...)
default:
🤖 Prompt for AI Agents
In pkg/dotc1z/c1file.go around lines 86 to 91, the v2Opts slice is constructed
(WithTmpDir/WithEngine/WithPragma) but not passed into v2.NewV2C1ZFile(),
causing those options to be ignored; update the call to pass the options (e.g.,
v2.NewV2C1ZFile(ctx, outputFilePath, v2Opts...) or the appropriate signature) so
the built v2Opts are forwarded to the V2 constructor.

Comment on lines +639 to +654
// Create modified subset for upsert
rand.New(rand.NewSource(time.Now().UnixNano()))
upsertResources := make([]*v2.Resource, 0, upsertCount)
for i := 0; i < upsertCount; i++ {
// Pick random resource to update
originalIdx := rand.Intn(setupCount)
updatedResource := &v2.Resource{
Id: &v2.ResourceId{
ResourceType: fmt.Sprintf("upsert_type_%d", originalIdx%10),
Resource: fmt.Sprintf("upsert_resource_%d", originalIdx),
},
DisplayName: fmt.Sprintf("Updated Upsert Resource %d - %d", originalIdx, time.Now().UnixNano()),
}
upsertResources = append(upsertResources, updatedResource)
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: random source created but not used; global RNG is used instead.

rand.New(...) is called and discarded; subsequent calls use the global RNG (rand.Intn) with default seed, making runs non-random.

Fix by using the local RNG:

- rand.New(rand.NewSource(time.Now().UnixNano()))
- upsertResources := make([]*v2.Resource, 0, upsertCount)
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ upsertResources := make([]*v2.Resource, 0, upsertCount)
  for i := 0; i < upsertCount; i++ {
-   originalIdx := rand.Intn(setupCount)
+   originalIdx := r.Intn(setupCount)
    ...
  }

This also avoids unintended cross-test interference.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Create modified subset for upsert
rand.New(rand.NewSource(time.Now().UnixNano()))
upsertResources := make([]*v2.Resource, 0, upsertCount)
for i := 0; i < upsertCount; i++ {
// Pick random resource to update
originalIdx := rand.Intn(setupCount)
updatedResource := &v2.Resource{
Id: &v2.ResourceId{
ResourceType: fmt.Sprintf("upsert_type_%d", originalIdx%10),
Resource: fmt.Sprintf("upsert_resource_%d", originalIdx),
},
DisplayName: fmt.Sprintf("Updated Upsert Resource %d - %d", originalIdx, time.Now().UnixNano()),
}
upsertResources = append(upsertResources, updatedResource)
}
// Create modified subset for upsert
r := rand.New(rand.NewSource(time.Now().UnixNano()))
upsertResources := make([]*v2.Resource, 0, upsertCount)
for i := 0; i < upsertCount; i++ {
// Pick random resource to update
originalIdx := r.Intn(setupCount)
updatedResource := &v2.Resource{
Id: &v2.ResourceId{
ResourceType: fmt.Sprintf("upsert_type_%d", originalIdx%10),
Resource: fmt.Sprintf("upsert_resource_%d", originalIdx),
},
DisplayName: fmt.Sprintf("Updated Upsert Resource %d - %d", originalIdx, time.Now().UnixNano()),
}
upsertResources = append(upsertResources, updatedResource)
}
🤖 Prompt for AI Agents
In pkg/dotc1z/engine/benchmarks/comparison_test.go around lines 639 to 654, the
code calls rand.New(rand.NewSource(...)) but discards the result and continues
to use the global rand.Intn, so the intended local RNG/seed is never used;
create a local RNG variable (e.g., rng :=
rand.New(rand.NewSource(time.Now().UnixNano()))) and replace calls to
rand.Intn(...) with rng.Intn(...), ensuring the seeded local RNG is used to
avoid cross-test interference and non-random behavior.

Comment on lines +646 to +651
Id: &v2.ResourceId{
ResourceType: fmt.Sprintf("upsert_type_%d", originalIdx%10),
Resource: fmt.Sprintf("upsert_resource_%d", originalIdx),
},
DisplayName: fmt.Sprintf("Updated Upsert Resource %d - %d", originalIdx, time.Now().UnixNano()),
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

IDs don’t match setup; updates become new inserts.

Initial setup uses "upsert-type-%d" and "upsert-resource-%d", but updates use "upsert_type_%d"/"upsert_resource_%d". This prevents upsert behavior and biases results.

Align IDs:

- ResourceType: fmt.Sprintf("upsert_type_%d", originalIdx%10),
- Resource:     fmt.Sprintf("upsert_resource_%d", originalIdx),
+ ResourceType: fmt.Sprintf("upsert-type-%d", originalIdx%10),
+ Resource:     fmt.Sprintf("upsert-resource-%d", originalIdx),

Apply in both Pebble and SQLite branches.

Also applies to: 715-721

🤖 Prompt for AI Agents
In pkg/dotc1z/engine/benchmarks/comparison_test.go around lines 646-651 (and
similarly at 715-721), the ResourceId strings used for updates use
"upsert_type_%d"/"upsert_resource_%d" which do not match the initial setup keys
"upsert-type-%d"/"upsert-resource-%d", causing updates to be treated as new
inserts; change the ResourceType and Resource formatting in both the Pebble and
SQLite update branches to use "upsert-type-%d" and "upsert-resource-%d"
respectively so the IDs match the initial setup and true upsert behavior occurs.

Comment on lines +213 to +324
"comparisons": [
{
"test_name": "UpsertIfNewer_setup_500000_upsert_50000",
"pebble_ops_per_sec": 222540.4550729766,
"sqlite_ops_per_sec": 94463.82920939679,
"speedup_ratio": 1.3558271661809722,
"pebble_allocs_per_op": 9.02692,
"sqlite_allocs_per_op": 53.46754,
"alloc_ratio": -0.8311700893663706,
"alloc_percentage_diff": -83.11700893663706,
"winner": "pebble"
},
{
"test_name": "CleanupSync_100000",
"pebble_ops_per_sec": 893755.111385482,
"sqlite_ops_per_sec": 1794623308.5675316,
"speedup_ratio": -0.9995019817768337,
"pebble_allocs_per_op": 0.01054,
"sqlite_allocs_per_op": 0.00126,
"alloc_ratio": 7.365079365079365,
"alloc_percentage_diff": 736.5079365079365,
"winner": "sqlite"
},
{
"test_name": "CleanupSync_500000",
"pebble_ops_per_sec": 1264608.4234946587,
"sqlite_ops_per_sec": 6380697963.281211,
"speedup_ratio": -0.9998018071955809,
"pebble_allocs_per_op": 0.0052573333333333335,
"sqlite_allocs_per_op": 0.000252,
"alloc_ratio": 19.862433862433864,
"alloc_percentage_diff": 1986.2433862433863,
"winner": "sqlite"
},
{
"test_name": "RawInserts_500000",
"pebble_ops_per_sec": 1103217.4870244854,
"sqlite_ops_per_sec": 82413.65417528438,
"speedup_ratio": 12.386343538148042,
"pebble_allocs_per_op": 9.0035,
"sqlite_allocs_per_op": 53.461612,
"alloc_ratio": -0.8315894402884821,
"alloc_percentage_diff": -83.15894402884821,
"winner": "pebble"
},
{
"test_name": "SequentialReads_setup_100000_read_10000",
"pebble_ops_per_sec": 19951756.652414463,
"sqlite_ops_per_sec": 1721478.8398403225,
"speedup_ratio": 10.589893637185288,
"pebble_allocs_per_op": 1.3119,
"sqlite_allocs_per_op": 12.9994,
"alloc_ratio": -0.8990799575365017,
"alloc_percentage_diff": -89.90799575365017,
"winner": "pebble"
},
{
"test_name": "RandomReads_setup_100000_read_25000",
"pebble_ops_per_sec": 238000.13097623206,
"sqlite_ops_per_sec": 37246.2436697681,
"speedup_ratio": 5.389909626495066,
"pebble_allocs_per_op": 47.54084,
"sqlite_allocs_per_op": 355.00056,
"alloc_ratio": -0.8660823520954446,
"alloc_percentage_diff": -86.60823520954446,
"winner": "pebble"
},
{
"test_name": "RawInserts_100000",
"pebble_ops_per_sec": 1020858.6952915445,
"sqlite_ops_per_sec": 89735.73499063271,
"speedup_ratio": 10.376278306497511,
"pebble_allocs_per_op": 9.01203,
"sqlite_allocs_per_op": 53.46461,
"alloc_ratio": -0.8314393390319316,
"alloc_percentage_diff": -83.14393390319316,
"winner": "pebble"
},
{
"test_name": "SequentialReads_setup_100000_read_50000",
"pebble_ops_per_sec": 141827073.08632728,
"sqlite_ops_per_sec": 9035735.248620061,
"speedup_ratio": 14.696240447946627,
"pebble_allocs_per_op": 0.2612,
"sqlite_allocs_per_op": 2.59984,
"alloc_ratio": -0.8995322789094713,
"alloc_percentage_diff": -89.95322789094713,
"winner": "pebble"
},
{
"test_name": "RandomReads_setup_100000_read_10000",
"pebble_ops_per_sec": 308552.6950149455,
"sqlite_ops_per_sec": 38776.667189040076,
"speedup_ratio": 6.9571741818532455,
"pebble_allocs_per_op": 48.0175,
"sqlite_allocs_per_op": 355.0026,
"alloc_ratio": -0.8647404272532089,
"alloc_percentage_diff": -86.47404272532088,
"winner": "pebble"
},
{
"test_name": "UpsertIfNewer_setup_100000_upsert_10000",
"pebble_ops_per_sec": 238713.09579088658,
"sqlite_ops_per_sec": 126364.47251735102,
"speedup_ratio": 0.8890839413594596,
"pebble_allocs_per_op": 9.0197,
"sqlite_allocs_per_op": 53.4972,
"alloc_ratio": -0.8313986526397643,
"alloc_percentage_diff": -83.13986526397643,
"winner": "pebble"
}
],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Validate “speedup_ratio” semantics across records

Some entries show near −1 values for cleanup tests, which aligns with the README definition (relative difference). Please ensure all producers adhere to this one formula and that the README uses the same terminology to prevent reader confusion.

Use this script to recompute both arithmetic and geometric means from the file and spot inconsistencies:


🏁 Script executed:

#!/bin/bash
set -euo pipefail
f="pkg/dotc1z/engine/benchmarks/engine_comparison_results.json"

# Arithmetic mean of speedup ratios
jq '[.comparisons[].speedup_ratio] | add / length' "$f"

# Geometric mean of speedups (convert relative diff -> ratio = 1+diff; skip negatives that would make ratio <= 0)
jq '[.comparisons[]
     | (1 + .speedup_ratio)
     | select(. > 0)
    ] as $r
    | ($r | length) as $n
    | ($r | map(log) | add) / $n
    | exp' "$f"

# Category-wise means (to see which tests dominate)
jq '[.comparisons[]
     | {name: .test_name,
        cat: (if (.test_name | test("Cleanup")) then "cleanup"
              elif (.test_name | test("Reads")) then "reads"
              else "writes" end),
        diff: .speedup_ratio}] 
    | group_by(.cat) 
    | map({cat: .[0].cat, avg: (map(.diff) | add / length)})' "$f"

Length of output: 1009


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate README files
echo "Benchmark README files:"
find pkg/dotc1z/engine/benchmarks -maxdepth 1 -type f -iname "README*"

# Inspect definitions in benchmarks README if present
if [ -f pkg/dotc1z/engine/benchmarks/README.md ]; then
  echo -e "\nContents around 'speedup_ratio' in pkg/dotc1z/engine/benchmarks/README.md:"
  rg -n 'speedup_ratio' -C3 pkg/dotc1z/engine/benchmarks/README.md || true
  rg -n 'speedup' -C3 pkg/dotc1z/engine/benchmarks/README.md || true
fi

# Inspect root README for any references to speedup semantics
if [ -f README.md ]; then
  echo -e "\nContents around 'speedup' in root README.md:"
  rg -n 'speedup' -C3 README.md || true
fi

Length of output: 372


Add explicit speedup_ratio definition and confirm consistent calculation

Our analysis shows that all “Cleanup” entries yield speedup_ratio ≈ −0.99965 and the “Reads” and “Writes” averages are ≈ 9.4083 and ≈ 6.2519 respectively, confirming a consistent use of the relative‐difference formula:

speedup_ratio = (pebble_ops_per_sec / sqlite_ops_per_sec) - 1

However, there is no existing documentation defining speedup_ratio. To prevent future confusion:

• Add a clear definition of speedup_ratio (as shown above) to pkg/dotc1z/engine/benchmarks/README.md (or the appropriate user‐facing docs).
• Verify that every benchmark producer calculates speedup_ratio using this exact formula.
• Consider renaming the field or adding inline comments if needed to reinforce that negative values indicate SQLite outperforming Pebble.

🤖 Prompt for AI Agents
In pkg/dotc1z/engine/benchmarks/engine_comparison_results.json around lines 213
to 324, there is no explicit definition of speedup_ratio in the repo; add a
concise entry to pkg/dotc1z/engine/benchmarks/README.md stating "speedup_ratio =
(pebble_ops_per_sec / sqlite_ops_per_sec) - 1" (and that negative values mean
SQLite is faster), then audit all benchmark producers that emit
engine_comparison_results.json to ensure they compute speedup_ratio using that
exact formula and update any producer code/comments to match (or rename the
field/add an inline comment) so future output and docs remain consistent.

Comment on lines +281 to +296
// End the sync
beforeEnd := time.Now()
err = engine.EndSync(ctx)
require.NoError(t, err)
afterEnd := time.Now()

// Verify current sync is cleared
assert.Equal(t, "", engine.getCurrentSyncID())

// Verify the sync run was updated
syncRun, err := engine.getSyncRun(ctx, syncID)
require.NoError(t, err)
assert.GreaterOrEqual(t, syncRun.EndedAt, beforeEnd.UnixNano())
assert.LessOrEqual(t, syncRun.EndedAt, afterEnd.UnixNano())
assert.Equal(t, "final-token", syncRun.SyncToken)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Time unit inconsistency: EndedAt compared to UnixNano but elsewhere stored as Unix seconds

Other tests (e.g., conversion tests) treat StartedAt/EndedAt as Unix seconds. Here the assertions compare EndedAt against UnixNano(), which will fail if the engine stores seconds.

Align units to seconds:

- beforeEnd := time.Now()
+ beforeEnd := time.Now()
  err = engine.EndSync(ctx)
  require.NoError(t, err)
- afterEnd := time.Now()
+ afterEnd := time.Now()
@@
- assert.GreaterOrEqual(t, syncRun.EndedAt, beforeEnd.UnixNano())
- assert.LessOrEqual(t, syncRun.EndedAt, afterEnd.UnixNano())
+ assert.GreaterOrEqual(t, syncRun.EndedAt, beforeEnd.Unix())
+ assert.LessOrEqual(t, syncRun.EndedAt, afterEnd.Unix())

If the engine truly intends to store nanoseconds, adjust the other tests (and conversion) to nanoseconds instead — but be consistent across the suite.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// End the sync
beforeEnd := time.Now()
err = engine.EndSync(ctx)
require.NoError(t, err)
afterEnd := time.Now()
// Verify current sync is cleared
assert.Equal(t, "", engine.getCurrentSyncID())
// Verify the sync run was updated
syncRun, err := engine.getSyncRun(ctx, syncID)
require.NoError(t, err)
assert.GreaterOrEqual(t, syncRun.EndedAt, beforeEnd.UnixNano())
assert.LessOrEqual(t, syncRun.EndedAt, afterEnd.UnixNano())
assert.Equal(t, "final-token", syncRun.SyncToken)
}
// End the sync
beforeEnd := time.Now()
err = engine.EndSync(ctx)
require.NoError(t, err)
afterEnd := time.Now()
// Verify current sync is cleared
assert.Equal(t, "", engine.getCurrentSyncID())
// Verify the sync run was updated
syncRun, err := engine.getSyncRun(ctx, syncID)
require.NoError(t, err)
assert.GreaterOrEqual(t, syncRun.EndedAt, beforeEnd.Unix())
assert.LessOrEqual(t, syncRun.EndedAt, afterEnd.Unix())
assert.Equal(t, "final-token", syncRun.SyncToken)
}
🤖 Prompt for AI Agents
In pkg/dotc1z/engine/pebble/sync_test.go around lines 281 to 296 the test
compares syncRun.EndedAt against beforeEnd.UnixNano()/afterEnd.UnixNano() but
other tests expect StartedAt/EndedAt to be stored as Unix seconds; change the
assertions to compare against beforeEnd.Unix() and afterEnd.Unix() (or convert
syncRun.EndedAt to ns if you intend to switch the whole suite to nanoseconds),
and ensure SyncRun timestamps and all related tests/conversion helpers use the
same seconds-based unit for consistency across the test suite.

Comment on lines +35 to +43
if sr.StartedAt > 0 {
t := time.Unix(sr.StartedAt, 0)
startedAt = &t
}

if sr.EndedAt > 0 {
t := time.Unix(sr.EndedAt, 0)
endedAt = &t
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Time unit mismatch: StartedAt/EndedAt stored in nanoseconds but decoded as seconds

  • startNewSyncInternal and EndSync set UnixNano().
  • toEngineSyncRun uses time.Unix(seconds, 0), treating those nanos as seconds.
  • fromEngineSyncRun uses Unix(), writing seconds back into nanosecond fields.

This breaks sorting, pagination tokens, and “older than a week” logic.

Make all paths consistently use nanoseconds:

 func (sr *SyncRun) toEngineSyncRun() *engine.SyncRun {
   var startedAt, endedAt *time.Time

   if sr.StartedAt > 0 {
-    t := time.Unix(sr.StartedAt, 0)
+    t := time.Unix(0, sr.StartedAt)
     startedAt = &t
   }

   if sr.EndedAt > 0 {
-    t := time.Unix(sr.EndedAt, 0)
+    t := time.Unix(0, sr.EndedAt)
     endedAt = &t
   }
   ...
 }

 func fromEngineSyncRun(sr *engine.SyncRun) *SyncRun {
   psr := &SyncRun{ ... }
   if sr.StartedAt != nil {
-    psr.StartedAt = sr.StartedAt.Unix()
+    psr.StartedAt = sr.StartedAt.UnixNano()
   }
   if sr.EndedAt != nil {
-    psr.EndedAt = sr.EndedAt.Unix()
+    psr.EndedAt = sr.EndedAt.UnixNano()
   }
   return psr
 }

 // Already consistent:
 // syncRun := &SyncRun{ StartedAt: time.Now().UnixNano(), ... }
 // syncRun.EndedAt = time.Now().UnixNano()

Also review any consumers comparing raw StartedAt/EndedAt (e.g., cleanup heuristics) to ensure they use the same unit.

Also applies to: 68-70, 153-160, 269-271

🤖 Prompt for AI Agents
In pkg/dotc1z/engine/pebble/sync.go around lines 35-43 (and also at 68-70,
153-160, 269-271), the code treats StartedAt/EndedAt as seconds (using
time.Unix(sec,0)) while the rest of the code writes nanoseconds (UnixNano()),
causing unit mismatch; change decoding to time.Unix(0,
sr.StartedAt)/time.Unix(0, sr.EndedAt) so the int64 nanosecond fields are
interpreted correctly, and when encoding ensure you use t.UnixNano()
consistently; after making these changes, audit any consumers (cleanup
heuristics, pagination/sorting logic, token generation) that compare raw
StartedAt/EndedAt to ensure they expect nanoseconds and adjust comparisons or
conversions as needed.

Comment on lines +67 to +116
// unmarshalEnvelope decodes a ValueEnvelope from binary encoding.
func (vc *ValueCodec) unmarshalEnvelope(data []byte) (*ValueEnvelope, error) {
if len(data) < 16 { // minimum: 8 + 4 + 4 = 16 bytes
return nil, fmt.Errorf("envelope data too short: %d bytes", len(data))
}

buf := bytes.NewReader(data)
envelope := &ValueEnvelope{}

// Read discovered_at
err := binary.Read(buf, binary.BigEndian, &envelope.DiscoveredAt)
if err != nil {
return nil, fmt.Errorf("failed to read discovered_at: %w", err)
}

// Read content_type length and content_type
var contentTypeLen uint32
err = binary.Read(buf, binary.BigEndian, &contentTypeLen)
if err != nil {
return nil, fmt.Errorf("failed to read content_type length: %w", err)
}

if contentTypeLen > 0 {
contentTypeBytes := make([]byte, contentTypeLen)
_, err = buf.Read(contentTypeBytes)
if err != nil {
return nil, fmt.Errorf("failed to read content_type: %w", err)
}
envelope.ContentType = string(contentTypeBytes)
}

// Read data length and data
var dataLen uint32
err = binary.Read(buf, binary.BigEndian, &dataLen)
if err != nil {
return nil, fmt.Errorf("failed to read data length: %w", err)
}

if dataLen > 0 {
envelope.Data = make([]byte, dataLen)
_, err = buf.Read(envelope.Data)
if err != nil {
return nil, fmt.Errorf("failed to read data: %w", err)
}
} else {
envelope.Data = []byte{}
}

return envelope, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add length bounds checks to prevent oversized allocations and use io.ReadFull.

unmarshalEnvelope trusts content lengths read from the buffer. A crafted or corrupted payload can cause large allocations before erring. Guard lengths against the remaining reader size and use io.ReadFull to ensure exact reads.

Apply this diff:

 import (
   "bytes"
   "encoding/binary"
   "fmt"
+  "io"
   "time"
 
   "google.golang.org/protobuf/proto"
 )
@@
   // Read content_type length and content_type
   var contentTypeLen uint32
   err = binary.Read(buf, binary.BigEndian, &contentTypeLen)
   if err != nil {
     return nil, fmt.Errorf("failed to read content_type length: %w", err)
   }
 
-  if contentTypeLen > 0 {
-    contentTypeBytes := make([]byte, contentTypeLen)
-    _, err = buf.Read(contentTypeBytes)
+  if contentTypeLen > 0 {
+    if int(contentTypeLen) > buf.Len() {
+      return nil, fmt.Errorf("invalid content_type length: %d > %d", contentTypeLen, buf.Len())
+    }
+    contentTypeBytes := make([]byte, contentTypeLen)
+    _, err = io.ReadFull(buf, contentTypeBytes)
     if err != nil {
       return nil, fmt.Errorf("failed to read content_type: %w", err)
     }
     envelope.ContentType = string(contentTypeBytes)
   }
@@
   // Read data length and data
   var dataLen uint32
   err = binary.Read(buf, binary.BigEndian, &dataLen)
   if err != nil {
     return nil, fmt.Errorf("failed to read data length: %w", err)
   }
 
-  if dataLen > 0 {
-    envelope.Data = make([]byte, dataLen)
-    _, err = buf.Read(envelope.Data)
+  if dataLen > 0 {
+    if int(dataLen) > buf.Len() {
+      return nil, fmt.Errorf("invalid data length: %d > %d", dataLen, buf.Len())
+    }
+    envelope.Data = make([]byte, dataLen)
+    _, err = io.ReadFull(buf, envelope.Data)
     if err != nil {
       return nil, fmt.Errorf("failed to read data: %w", err)
     }
   } else {
     envelope.Data = []byte{}
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// unmarshalEnvelope decodes a ValueEnvelope from binary encoding.
func (vc *ValueCodec) unmarshalEnvelope(data []byte) (*ValueEnvelope, error) {
if len(data) < 16 { // minimum: 8 + 4 + 4 = 16 bytes
return nil, fmt.Errorf("envelope data too short: %d bytes", len(data))
}
buf := bytes.NewReader(data)
envelope := &ValueEnvelope{}
// Read discovered_at
err := binary.Read(buf, binary.BigEndian, &envelope.DiscoveredAt)
if err != nil {
return nil, fmt.Errorf("failed to read discovered_at: %w", err)
}
// Read content_type length and content_type
var contentTypeLen uint32
err = binary.Read(buf, binary.BigEndian, &contentTypeLen)
if err != nil {
return nil, fmt.Errorf("failed to read content_type length: %w", err)
}
if contentTypeLen > 0 {
contentTypeBytes := make([]byte, contentTypeLen)
_, err = buf.Read(contentTypeBytes)
if err != nil {
return nil, fmt.Errorf("failed to read content_type: %w", err)
}
envelope.ContentType = string(contentTypeBytes)
}
// Read data length and data
var dataLen uint32
err = binary.Read(buf, binary.BigEndian, &dataLen)
if err != nil {
return nil, fmt.Errorf("failed to read data length: %w", err)
}
if dataLen > 0 {
envelope.Data = make([]byte, dataLen)
_, err = buf.Read(envelope.Data)
if err != nil {
return nil, fmt.Errorf("failed to read data: %w", err)
}
} else {
envelope.Data = []byte{}
}
return envelope, nil
}
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"time"
"google.golang.org/protobuf/proto"
)
// unmarshalEnvelope decodes a ValueEnvelope from binary encoding.
func (vc *ValueCodec) unmarshalEnvelope(data []byte) (*ValueEnvelope, error) {
if len(data) < 16 { // minimum: 8 + 4 + 4 = 16 bytes
return nil, fmt.Errorf("envelope data too short: %d bytes", len(data))
}
buf := bytes.NewReader(data)
envelope := &ValueEnvelope{}
// Read discovered_at
err := binary.Read(buf, binary.BigEndian, &envelope.DiscoveredAt)
if err != nil {
return nil, fmt.Errorf("failed to read discovered_at: %w", err)
}
// Read content_type length and content_type
var contentTypeLen uint32
err = binary.Read(buf, binary.BigEndian, &contentTypeLen)
if err != nil {
return nil, fmt.Errorf("failed to read content_type length: %w", err)
}
if contentTypeLen > 0 {
if int(contentTypeLen) > buf.Len() {
return nil, fmt.Errorf("invalid content_type length: %d > %d", contentTypeLen, buf.Len())
}
contentTypeBytes := make([]byte, contentTypeLen)
_, err = io.ReadFull(buf, contentTypeBytes)
if err != nil {
return nil, fmt.Errorf("failed to read content_type: %w", err)
}
envelope.ContentType = string(contentTypeBytes)
}
// Read data length and data
var dataLen uint32
err = binary.Read(buf, binary.BigEndian, &dataLen)
if err != nil {
return nil, fmt.Errorf("failed to read data length: %w", err)
}
if dataLen > 0 {
if int(dataLen) > buf.Len() {
return nil, fmt.Errorf("invalid data length: %d > %d", dataLen, buf.Len())
}
envelope.Data = make([]byte, dataLen)
_, err = io.ReadFull(buf, envelope.Data)
if err != nil {
return nil, fmt.Errorf("failed to read data: %w", err)
}
} else {
envelope.Data = []byte{}
}
return envelope, nil
}

Comment on lines +160 to +170
func (c *SQLiteAttached) DetachFile(dbName string) (engine.AttachedStorageEngine, error) {
_, err := c.file.db.Exec(`DETACH DATABASE ?`, dbName)
if err != nil {
return nil, err
}

return &SQLiteAttached{
safe: false,
file: c.file,
}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

DetachFile leaves the current object “safe”; subsequent operations won’t short-circuit.

After DETACH, c.safe remains true on the receiver, so future calls pass the safety check but will fail deeper when the schema alias is missing. Flip the flag on the receiver and return it.

Apply this diff:

 func (c *SQLiteAttached) DetachFile(dbName string) (engine.AttachedStorageEngine, error) {
   _, err := c.file.db.Exec(`DETACH DATABASE ?`, dbName)
   if err != nil {
     return nil, err
   }

-  return &SQLiteAttached{
-    safe: false,
-    file: c.file,
-  }, nil
+  c.safe = false
+  return c, nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *SQLiteAttached) DetachFile(dbName string) (engine.AttachedStorageEngine, error) {
_, err := c.file.db.Exec(`DETACH DATABASE ?`, dbName)
if err != nil {
return nil, err
}
return &SQLiteAttached{
safe: false,
file: c.file,
}, nil
}
func (c *SQLiteAttached) DetachFile(dbName string) (engine.AttachedStorageEngine, error) {
_, err := c.file.db.Exec(`DETACH DATABASE ?`, dbName)
if err != nil {
return nil, err
}
c.safe = false
return c, nil
}
🤖 Prompt for AI Agents
In pkg/dotc1z/engine/sqlite/attach.go around lines 160 to 170, DetachFile
currently leaves the receiver's safe flag true and returns a new SQLiteAttached
with safe=false; update the function to set c.safe = false on the receiver
itself after successful DETACH and return the receiver (c) so subsequent
operations will short-circuit correctly when unsafe.

Comment on lines +96 to 101
func (c *SQLite) PutResourceTypes(ctx context.Context, resourceTypesObjs ...*v2.ResourceType) error {
ctx, span := tracer.Start(ctx, "SQLite.PutResourceTypes")
defer span.End()

return c.putResourceTypesInternal(ctx, bulkPutConnectorObject, resourceTypesObjs...)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

PutResourceTypes builder is a stub; records aren’t constructed

The upsert helper is invoked with a builder that returns nil, which will either panic or write empty records depending on the implementation of bulkPutConnectorObject. This will break writing resource types.

Proposed implementation (mirror your pattern used for resources/entitlements):

 func (c *SQLite) putResourceTypesInternal(ctx context.Context, f resourceTypePutFunc, resourceTypesObjs ...*v2.ResourceType) error {
-  err := f(ctx, c, resourceTypes.Name(),
-    func(resource *v2.ResourceType) (goqu.Record, error) {
-      return nil, nil
-    },
-    resourceTypesObjs...,
-  )
+  err := f(ctx, c, resourceTypes.Name(),
+    func(rt *v2.ResourceType) (goqu.Record, error) {
+      // Marshal the protobuf payload
+      b, err := proto.Marshal(rt)
+      if err != nil {
+        return nil, fmt.Errorf("marshal resource type '%s': %w", rt.GetResourceTypeId(), err)
+      }
+      // Construct the insert/update record. Align column names with resourcesTypesTableSchema.
+      rec := goqu.Record{
+        "external_id":  rt.GetResourceTypeId(), // verify correct field name
+        "data":         b,
+        // "sync_id":    <supply from current sync>,  // follow the pattern used in other Put* helpers
+        // "discovered_at": <supply appropriate timestamp>,
+      }
+      return rec, nil
+    },
+    resourceTypesObjs...,
+  )
   if err != nil {
     return err
   }
   c.dbUpdated = true
   return nil
 }

To match the exact column mapping/timestamp/sync_id behavior, replicate the same builder used in your SQLite PutResources/PutEntitlements helpers:

Replace GetResourceTypeId() with the correct getter (e.g., GetId()) if your proto differs.

Also applies to: 110-124


🏁 Script executed:

#!/usr/bin/env bash
# Inspect other entity builders to mirror the exact schema/fields
rg -nP -C3 --type=go 'bulkPutConnectorObject\(|bulkPutConnectorObjectIfNewer\(' pkg/dotc1z/engine/sqlite

Length of output: 106


I’ll search the entire repo for the bulk helper and for the analogous PutResources/PutEntitlements implementations so we can mirror their builder logic.


🏁 Script executed:

#!/usr/bin/env bash
# 1. Find any definition or use of bulkPutConnectorObject
rg -nP 'bulkPutConnectorObject' -n -C2 .

# 2. Locate the SQLite PutResources method and its internal builder
rg -nP 'func \(c \*SQLite\) PutResources' -n -C3 .

# 3. Locate the SQLite PutEntitlements method and its internal builder
rg -nP 'func \(c \*SQLite\) PutEntitlements' -n -C3 .

Length of output: 6319


I’ll inspect the existing builders for resources and entitlements to mirror their record construction.


🏁 Script executed:

#!/usr/bin/env bash
# Show the implementation of putResourcesInternal and its builder
rg -nP 'func \(c \*SQLite\) putResourcesInternal' -n -C20 pkg/dotc1z/engine/sqlite/resources.go

# Show the implementation of putEntitlementsInternal and its builder
rg -nP 'func \(c \*SQLite\) putEntitlementsInternal' -n -C20 pkg/dotc1z/engine/sqlite/entitlements.go

Length of output: 3218


I’ll inspect the stubbed builder in resouce_types.go to confirm it returns nil records:


🏁 Script executed:

#!/usr/bin/env bash
# Inspect the stub implementation of putResourceTypesInternal
rg -nP 'func \(c \*SQLite\) putResourceTypesInternal' -n -C20 pkg/dotc1z/engine/sqlite/resouce_types.go

Length of output: 1307


Implement ResourceTypes builder – stub currently returns nil record

The current putResourceTypesInternal invokes the upsert helper with a builder that unconditionally returns (nil, nil), causing panics or empty‐row inserts. You must mirror the concrete builders used in PutResources and PutEntitlements, constructing a valid goqu.Record for each *v2.ResourceType.

Locations to fix:

  • pkg/dotc1z/engine/sqlite/resouce_types.go: lines 112–118 – stubbed builder returning nil.

Suggested diff (adjust getters/columns to match your resource_types schema):

 func (c *SQLite) putResourceTypesInternal(ctx context.Context, f resourceTypePutFunc, resourceTypesObjs ...*v2.ResourceType) error {
-   err := f(ctx, c, resourceTypes.Name(),
-       func(resource *v2.ResourceType) (goqu.Record, error) {
-           return nil, nil
-       },
-       resourceTypesObjs...,
-   )
+   err := f(ctx, c, resourceTypes.Name(),
+       func(rt *v2.ResourceType) (goqu.Record, error) {
+           // Serialize the ResourceType proto
+           data, err := proto.Marshal(rt)
+           if err != nil {
+               return nil, fmt.Errorf("marshal ResourceType '%s': %w", rt.GetResourceTypeId(), err)
+           }
+           // Construct the upsert record – align with your table’s columns
+           rec := goqu.Record{
+               "external_id":    rt.GetResourceTypeId(),    // verify correct getter
+               "data":           data,
+               // "sync_id":      c.currentSyncID,          // if you track sync sessions
+               // "discovered_at": rt.GetDiscoveredAt(),     // or time.Now().UTC(), per policy
+           }
+           return rec, nil
+       },
+       resourceTypesObjs...,
+   )
    if err != nil {
      return err
    }
    c.dbUpdated = true
    return nil
 }

• Confirm the exact column names and any additional fields (e.g. sync_id, discovered_at) by inspecting your resource_types table schema and the builders in putResourcesInternal/putEntitlementsInternal.
• Update import statements ("github.com/doug/proto" for proto, "github.com/doug/goqu" for goqu) if needed to match your module paths.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *SQLite) PutResourceTypes(ctx context.Context, resourceTypesObjs ...*v2.ResourceType) error {
ctx, span := tracer.Start(ctx, "SQLite.PutResourceTypes")
defer span.End()
return c.putResourceTypesInternal(ctx, bulkPutConnectorObject, resourceTypesObjs...)
}
func (c *SQLite) putResourceTypesInternal(ctx context.Context, f resourceTypePutFunc, resourceTypesObjs ...*v2.ResourceType) error {
err := f(ctx, c, resourceTypes.Name(),
func(rt *v2.ResourceType) (goqu.Record, error) {
// Serialize the ResourceType proto
data, err := proto.Marshal(rt)
if err != nil {
return nil, fmt.Errorf("marshal ResourceType '%s': %w", rt.GetResourceTypeId(), err)
}
// Construct the upsert record – align with your table’s columns
rec := goqu.Record{
"external_id": rt.GetResourceTypeId(), // verify correct getter
"data": data,
// "sync_id": c.currentSyncID, // if you track sync sessions
// "discovered_at": rt.GetDiscoveredAt(), // or time.Now().UTC(), per policy
}
return rec, nil
},
resourceTypesObjs...,
)
if err != nil {
return err
}
c.dbUpdated = true
return nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants