-
Notifications
You must be signed in to change notification settings - Fork 20
Open
Description
🎯 Objective
Implement etcd storage backend for the v2 storage layer, completing the trilogy of storage implementations (in-memory, PostgreSQL, and etcd) with a unified, type-safe interface.
📋 Background
The v2 storage layer introduced a generic IRepository[T] interface that allows multiple storage backends to be implemented in a pluggable fashion. We've already implemented:
- ✅ In-memory: Map-based storage for development and testing
- ✅ PostgreSQL: SQL database for production deployments
Now we need etcd to support:
- Kubernetes-native deployments - etcd is the standard K8s storage backend
- Distributed consensus - Built-in consistency and fault tolerance
- Watch capabilities - Real-time updates (future enhancement)
- Cloud-native architecture - Perfect for containerized environments
🏗️ Architecture
Current State
pkg/v2/storage/
├── IRepository.go # Generic interface
├── filter.go # Query filtering
├── errors.go # Error types
├── inmemory/ # In-memory implementation ✅
│ └── ...
└── postgres/ # PostgreSQL implementation ✅
└── ...
Proposed State
pkg/v2/storage/
├── IRepository.go # Generic interface
├── filter.go # Query filtering
├── errors.go # Error types
├── inmemory/ # In-memory implementation ✅
│ └── ...
├── postgres/ # PostgreSQL implementation ✅
│ └── ...
└── etcd/ # Etcd implementation ✅ (NEW)
├── repository.go # Core struct with etcd client
├── create.go # Strict create semantics
├── retrieve.go # Get/List with prefix queries
├── update.go # Strict update semantics
└── delete.go # Delete with verification
🔧 Implementation Details
Data Structure
type EtcdRepository[T metav1.Object] struct {
mu sync.Mutex // Thread-safe operations
client clientv3.KV // etcd KV client
gvk schema.GroupVersionKind
gr schema.GroupResource
namespaced bool
resourceType string
}Key Generation Strategy
Namespaced Resources
Format: "group/version/kind/namespace/name"
Example: "wgpolicyk8s.io/v1alpha2/PolicyReport/default/my-report"
Cluster-Scoped Resources
Format: "group/version/kind/name"
Example: "wgpolicyk8s.io/v1alpha2/ClusterPolicyReport/my-cluster-report"
Key Features
- Unified Repository - Single implementation handles both namespaced and cluster-scoped resources
- Prefix Queries - Efficient listing using etcd's prefix search
- Strict Semantics - Follows Kubernetes API conventions
- Protection Logic - Ignores namespace parameter for cluster-scoped resources
- JSON Storage - Entire object (including ObjectMeta with ResourceVersion) stored as JSON
Thread Safety
Uses sync.Mutex (not RWMutex) because:
- etcd client handles concurrent operations internally
- We need exclusive access for consistency checks (exist before create/update/delete)
- Simple locking model appropriate for network-bound operations
🎁 Benefits
For Kubernetes-Native Deployments
- ✅ Uses same storage backend as Kubernetes itself
- ✅ Consistent with K8s architecture patterns
- ✅ Built-in distributed consensus (Raft)
- ✅ Automatic leader election and failover
For Distributed Systems
- ✅ Strong consistency guarantees
- ✅ Watch support for real-time updates (future)
- ✅ Transaction support (future)
- ✅ TTL and lease management (future)
For Development
- ✅ Easy to run locally (single etcd container)
- ✅ No complex database setup
- ✅ Can share etcd cluster with K8s
🔄 Comparison with Other Backends
| Feature | In-Memory | PostgreSQL | Etcd |
|---|---|---|---|
| Persistence | No | Yes | Yes |
| Distributed | No | Via replicas | Built-in |
| Concurrency | RWMutex | Connection pool | Mutex + etcd |
| Setup | None | Database + schema | etcd cluster |
| Dependencies | Zero | PostgreSQL | etcd |
| Key Format | ns/name |
SQL rows | group/version/kind/ns/name |
| Consistency | Single node | ACID | Raft consensus |
| Watch Support | No | LISTEN/NOTIFY | Native watches |
| Best For | Dev/Test | Production SQL | K8s-native |
✅ Acceptance Criteria
- Implement
EtcdRepository[T]struct with etcd client - Implement all
IRepository[T]methods (Create, Get, List, Update, Delete) - Use
sync.Mutexfor operation consistency - Follow strict Kubernetes API semantics
- Handle both namespaced and cluster-scoped resources
- Proper key generation with GVK prefix
- Prefix queries for efficient listing
- No linter errors
- Code builds successfully
- Add unit tests
- Add integration tests with real etcd
- Performance benchmarks vs original implementation
- Documentation updates
📚 Example Usage
// Create etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
})
// Create repository for PolicyReports (namespaced)
repo := etcd.NewEtcdRepository[*v1alpha2.PolicyReport](
client,
schema.GroupVersionKind{
Group: "wgpolicyk8s.io",
Version: "v1alpha2",
Kind: "PolicyReport",
},
schema.GroupResource{
Group: "wgpolicyk8s.io",
Resource: "policyreports",
},
"PolicyReport",
true, // namespaced
)
// CRUD operations
report := &v1alpha2.PolicyReport{}
report.SetName("my-report")
report.SetNamespace("default")
err := repo.Create(ctx, report)
report, err := repo.Get(ctx, storage.NewFilter("my-report", "default"))
reports, err := repo.List(ctx, storage.Filter{Namespace: "default"})
err = repo.Update(ctx, report)
err = repo.Delete(ctx, storage.NewFilter("my-report", "default"))🔍 Technical Decisions
Why Mutex Instead of RWMutex?
Original v1 etcd implementation used sync.Mutex:
type objectStoreNamespaced[T metav1.Object] struct {
sync.Mutex // ← Uses simple Mutex
etcdclient clientv3.KV
// ...
}Reasons:
- etcd client is already concurrent-safe
- All operations need Get() before Write for consistency
- Network I/O dominates, lock contention is minimal
Why Full GVK in Key?
Follows etcd/Kubernetes conventions:
- Clear namespace separation
- Easy to list all resources of a kind
- Natural hierarchy for prefix queries
- Compatible with future multi-cluster scenarios
🔮 Future Enhancements
Watch Support
// Future API
watchChan := repo.Watch(ctx, filter)
for event := range watchChan {
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
handleEvent(event.Object)
}
}Transactions
// Future API for atomic operations
err := repo.Transaction(ctx, func(txn Transaction[T]) error {
obj1, _ := txn.Get(filter1)
obj2, _ := txn.Get(filter2)
txn.Update(obj1)
txn.Update(obj2)
return nil
})🐛 Differences from v1
v1 Implementation
- Separate stores:
ObjectStorageNamespacedandObjectStorageCluster - Cluster wrapper always passed
namespace="" - Key check:
if len(namespace) != 0
v2 Implementation
- Unified repository for both types
- Single
namespacedflag - Key check:
if r.namespaced && namespace != "" - More robust - protects against namespace misuse
Metadata
Metadata
Assignees
Labels
No labels