diff --git a/docs/using/etcd-reconfiguration.md b/docs/using/etcd-reconfiguration.md new file mode 100644 index 00000000..10fb69d8 --- /dev/null +++ b/docs/using/etcd-reconfiguration.md @@ -0,0 +1,146 @@ +# Etcd Mode Reconfiguration + + +This guide explains how to change a Control Plane host's etcd mode after cluster initialization. + + +## Overview + + +The Control Plane supports two etcd modes: + + +- **Server mode**: Runs an embedded etcd server and participates as a voting member +- **Client mode**: Connects to the etcd cluster as a client only + + +**Recommended topology:** +- 1-3 hosts: All should be etcd servers +- 4-7 hosts: 3 etcd servers, rest as clients +- 8+ hosts: 5 etcd servers, rest as clients + + +!!! warning "Maintain Odd Numbers" + Etcd requires an **odd number** of servers (3 or 5) for proper quorum. + + +## How It Works + + +Etcd mode reconfiguration is **fully automatic**: + + +1. Stop the container +2. Update `PGEDGE_ETCD_MODE` environment variable +3. Restart the container +4. The system automatically handles all cluster operations + + +**What happens automatically:** +- **Client→Server**: Discovers cluster, obtains credentials, joins as voting member +- **Server→Client**: Removes itself from membership, transitions to client mode + + +No manual API calls or configuration needed! + + +## Procedures + + +### Promoting a Client to Server (Example - host-4) + + +```bash +# 1. Stop the container +docker stop control-plane-host-4 + + +# 2. Update docker-compose.yaml environment: +PGEDGE_ETCD_MODE: server # was: client + + +# 3. Restart +docker-compose up -d host-4 + + +# 4. Verify (check logs) +docker logs control-plane-host-4 +``` + + +### Demoting a Server to Client (Example - host-4) + + +!!! warning "Quorum Check" + Ensure at least 2 other healthy servers remain before demotion. + + +```bash +# 1. Stop the container +docker stop control-plane-host-4 + + +# 2. Update docker-compose.yaml environment: +PGEDGE_ETCD_MODE: client # was: server + + +# 3. Restart +docker-compose up -d host-4 + + +# 4. Verify (check logs) +docker logs control-plane-host-4 +``` + + +## Troubleshooting + + +### Promotion Issues + + +**Problem**: Host fails to join cluster +**Solution**: Check logs for connection errors. Verify network connectivity and that other hosts are healthy. + + +**Problem**: "Permission denied" errors +**Solution**: System automatically obtains new credentials. If issue persists, check RBAC is enabled on cluster. + + +### Demotion Issues + + +**Problem**: Host fails to remove itself from membership +**Solution**: Check remaining servers have quorum. System continues transition even if removal fails. + + +**Problem**: Old data directory persists +**Solution**: System automatically cleans up etcd directory. If persists, manually remove after verifying host transitioned. + + +### General Troubleshooting + + +Check cluster health: + + +```bash +docker exec control-plane-host-1 etcdctl member list +``` + + +All members should show `STATUS=started`. + + +## Best Practices + + +- **Change one host at a time** - Wait for completion before reconfiguring another +- **Monitor cluster health** - Verify all servers healthy before/after changes +- **Maintain odd numbers** - Always keep 3 or 5 etcd servers, never 2 or 4 + +## Summary + + +Etcd mode reconfiguration is fully automatic - just update the environment variable and restart. The Control Plane handles all cluster operations including credential provisioning, membership changes, and configuration updates without manual intervention. + diff --git a/server/internal/api/apiv1/pre_init_handlers.go b/server/internal/api/apiv1/pre_init_handlers.go index 00fb55a8..af563f10 100644 --- a/server/internal/api/apiv1/pre_init_handlers.go +++ b/server/internal/api/apiv1/pre_init_handlers.go @@ -4,17 +4,14 @@ import ( "context" "crypto/tls" "crypto/x509" - "encoding/base64" "fmt" "net/http" "net/url" "os" "github.com/google/uuid" - goahttp "goa.design/goa/v3/http" api "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" - "github.com/pgEdge/control-plane/api/apiv1/gen/http/control_plane/client" "github.com/pgEdge/control-plane/server/internal/cluster" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/etcd" @@ -93,20 +90,15 @@ func (s *PreInitHandlers) JoinCluster(ctx context.Context, token *api.ClusterJoi return ErrInvalidServerURL } - http_client, err := s.GetClient() - + httpClient, err := s.GetClient() if err != nil { return err } - enc := goahttp.RequestEncoder - dec := goahttp.ResponseDecoder //make our own - c := client.NewClient(serverURL.Scheme, serverURL.Host, http_client, enc, dec, false) - cli := &api.Client{ - GetJoinOptionsEndpoint: c.GetJoinOptions(), - } + // Use shared API client creation utility + apiClient := etcd.CreateAPIClient(serverURL, httpClient) - opts, err := cli.GetJoinOptions(ctx, &api.ClusterJoinRequest{ + opts, err := apiClient.GetJoinOptions(ctx, &api.ClusterJoinRequest{ HostID: api.Identifier(s.cfg.HostID), Hostname: s.cfg.Hostname, Ipv4Address: s.cfg.IPv4Address, @@ -117,43 +109,13 @@ func (s *PreInitHandlers) JoinCluster(ctx context.Context, token *api.ClusterJoi return apiErr(err) } - caCert, err := base64.StdEncoding.DecodeString(opts.Credentials.CaCert) - if err != nil { - return apiErr(fmt.Errorf("failed to decode CA certificate: %w", err)) - } - clientCert, err := base64.StdEncoding.DecodeString(opts.Credentials.ClientCert) - if err != nil { - return apiErr(fmt.Errorf("failed to decode client certificate: %w", err)) - } - clientKey, err := base64.StdEncoding.DecodeString(opts.Credentials.ClientKey) - if err != nil { - return apiErr(fmt.Errorf("failed to decode client key: %w", err)) - } - serverCert, err := base64.StdEncoding.DecodeString(opts.Credentials.ServerCert) - if err != nil { - return apiErr(fmt.Errorf("failed to decode server certificate: %w", err)) - } - serverKey, err := base64.StdEncoding.DecodeString(opts.Credentials.ServerKey) + // Decode credentials using shared utility + joinOptions, err := etcd.DecodeJoinCredentials(opts) if err != nil { - return apiErr(fmt.Errorf("failed to decode server key: %w", err)) + return apiErr(err) } - err = s.etcd.Join(ctx, etcd.JoinOptions{ - Leader: &etcd.ClusterMember{ - Name: opts.Leader.Name, - PeerURLs: opts.Leader.PeerUrls, - ClientURLs: opts.Leader.ClientUrls, - }, - Credentials: &etcd.HostCredentials{ - Username: opts.Credentials.Username, - Password: opts.Credentials.Password, - CaCert: caCert, - ClientCert: clientCert, - ClientKey: clientKey, - ServerCert: serverCert, - ServerKey: serverKey, - }, - }) + err = s.etcd.Join(ctx, *joinOptions) if err != nil { return apiErr(fmt.Errorf("failed to join existing cluster: %w", err)) } diff --git a/server/internal/etcd/embedded.go b/server/internal/etcd/embedded.go index 85166a80..e5bc414b 100644 --- a/server/internal/etcd/embedded.go +++ b/server/internal/etcd/embedded.go @@ -53,6 +53,10 @@ func (e *EmbeddedEtcd) Start(ctx context.Context) error { e.mu.Lock() defer e.mu.Unlock() + if e.etcd != nil { + return nil // already started + } + initialized, err := e.IsInitialized() if err != nil { return err @@ -292,6 +296,7 @@ func (e *EmbeddedEtcd) Shutdown() error { } if e.etcd != nil { e.etcd.Close() + e.etcd = nil } return errors.Join(errs...) } diff --git a/server/internal/etcd/provide.go b/server/internal/etcd/provide.go index 6ce96b73..7d37634e 100644 --- a/server/internal/etcd/provide.go +++ b/server/internal/etcd/provide.go @@ -1,7 +1,9 @@ package etcd import ( + "context" "fmt" + "time" "github.com/rs/zerolog" "github.com/samber/do" @@ -27,6 +29,18 @@ func provideClient(i *do.Injector) { }) } +// newEtcdForMode creates an Etcd instance based on the specified mode. +func newEtcdForMode(mode config.EtcdMode, cfg *config.Manager, logger zerolog.Logger) (Etcd, error) { + switch mode { + case config.EtcdModeServer: + return NewEmbeddedEtcd(cfg, logger), nil + case config.EtcdModeClient: + return NewRemoteEtcd(cfg, logger), nil + default: + return nil, fmt.Errorf("invalid etcd mode: %s", mode) + } +} + func provideEtcd(i *do.Injector) { do.Provide(i, func(i *do.Injector) (Etcd, error) { cfg, err := do.Invoke[*config.Manager](i) @@ -38,13 +52,47 @@ func provideEtcd(i *do.Injector) { return nil, err } - switch storageType := cfg.Config().EtcdMode; storageType { - case config.EtcdModeServer: - return NewEmbeddedEtcd(cfg, logger), nil - case config.EtcdModeClient: - return NewRemoteEtcd(cfg, logger), nil + appCfg := cfg.Config() + generated := cfg.GeneratedConfig() + + oldMode := generated.EtcdMode + newMode := appCfg.EtcdMode + + logger.Info(). + Str("old_mode", string(oldMode)). + Str("new_mode", string(newMode)). + Bool("old_mode_empty", oldMode == ""). + Bool("modes_equal", oldMode == newMode). + Msg("checking etcd mode for reconfiguration") + + // First startup (no generated config yet) or no change: use the configured mode. + if oldMode == "" || oldMode == newMode { + logger.Info(). + Str("mode", string(newMode)). + Bool("first_startup", oldMode == ""). + Msg("creating new etcd instance for mode (no reconfiguration needed)") + return newEtcdForMode(newMode, cfg, logger) + } + + // Mode has changed - perform reconfiguration. + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + logger.Info(). + Str("host_id", appCfg.HostID). + Str("old_mode", string(oldMode)). + Str("new_mode", string(newMode)). + Msg("detected etcd_mode change, performing reconfiguration") + + switch { + case oldMode == config.EtcdModeServer && newMode == config.EtcdModeClient: + return reconfigureServerToClient(ctx, cfg, logger) + + case oldMode == config.EtcdModeClient && newMode == config.EtcdModeServer: + return reconfigureClientToServer(ctx, cfg, logger) + default: - return nil, fmt.Errorf("invalid storage type: %s", storageType) + return nil, fmt.Errorf("unsupported etcd mode transition: %s -> %s", oldMode, newMode) } }) } diff --git a/server/internal/etcd/rbac.go b/server/internal/etcd/rbac.go index bb2b26ae..1648619c 100644 --- a/server/internal/etcd/rbac.go +++ b/server/internal/etcd/rbac.go @@ -353,6 +353,8 @@ func writeHostCredentials(creds *HostCredentials, cfg *config.Manager) error { generatedCfg := cfg.GeneratedConfig() generatedCfg.EtcdUsername = creds.Username generatedCfg.EtcdPassword = creds.Password + generatedCfg.EtcdMode = appCfg.EtcdMode + if err := cfg.UpdateGeneratedConfig(generatedCfg); err != nil { return fmt.Errorf("failed to update generated config: %w", err) } diff --git a/server/internal/etcd/reconfigure.go b/server/internal/etcd/reconfigure.go new file mode 100644 index 00000000..20904bdd --- /dev/null +++ b/server/internal/etcd/reconfigure.go @@ -0,0 +1,500 @@ +package etcd + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "strings" + "time" + + "github.com/rs/zerolog" + clientv3 "go.etcd.io/etcd/client/v3" + goahttp "goa.design/goa/v3/http" + + api "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" + "github.com/pgEdge/control-plane/api/apiv1/gen/http/control_plane/client" + "github.com/pgEdge/control-plane/server/internal/config" +) + +// DecodeJoinCredentials decodes base64-encoded credentials from the API join response +// and converts them to etcd JoinOptions. This is a shared utility used by both +// automatic reconfiguration and the JoinCluster API. +func DecodeJoinCredentials(joinOpts *api.ClusterJoinOptions) (*JoinOptions, error) { + // Decode all certificate and key data + caCert, err := base64.StdEncoding.DecodeString(joinOpts.Credentials.CaCert) + if err != nil { + return nil, fmt.Errorf("failed to decode CA certificate: %w", err) + } + + clientCert, err := base64.StdEncoding.DecodeString(joinOpts.Credentials.ClientCert) + if err != nil { + return nil, fmt.Errorf("failed to decode client certificate: %w", err) + } + + clientKey, err := base64.StdEncoding.DecodeString(joinOpts.Credentials.ClientKey) + if err != nil { + return nil, fmt.Errorf("failed to decode client key: %w", err) + } + + serverCert, err := base64.StdEncoding.DecodeString(joinOpts.Credentials.ServerCert) + if err != nil { + return nil, fmt.Errorf("failed to decode server certificate: %w", err) + } + + serverKey, err := base64.StdEncoding.DecodeString(joinOpts.Credentials.ServerKey) + if err != nil { + return nil, fmt.Errorf("failed to decode server key: %w", err) + } + + // Create JoinOptions for embedded etcd + return &JoinOptions{ + Leader: &ClusterMember{ + Name: joinOpts.Leader.Name, + PeerURLs: joinOpts.Leader.PeerUrls, + ClientURLs: joinOpts.Leader.ClientUrls, + }, + Credentials: &HostCredentials{ + Username: joinOpts.Credentials.Username, + Password: joinOpts.Credentials.Password, + CaCert: caCert, + ClientCert: clientCert, + ClientKey: clientKey, + ServerCert: serverCert, + ServerKey: serverKey, + }, + }, nil +} + +// CreateAPIClient creates a goa API client for the Control Plane API. +// This is a shared utility used by both automatic reconfiguration and the JoinCluster API. +func CreateAPIClient(serverURL *url.URL, httpClient *http.Client) *api.Client { + enc := goahttp.RequestEncoder + dec := goahttp.ResponseDecoder + c := client.NewClient(serverURL.Scheme, serverURL.Host, httpClient, enc, dec, false) + + return &api.Client{ + GetJoinTokenEndpoint: c.GetJoinToken(), + GetJoinOptionsEndpoint: c.GetJoinOptions(), + } +} + +// reconfigureServerToClient handles the transition from server mode to client mode. +// It removes the host from the etcd cluster membership and configures it as a remote client. +func reconfigureServerToClient( + ctx context.Context, + cfg *config.Manager, + logger zerolog.Logger, +) (Etcd, error) { + appCfg := cfg.Config() + generated := cfg.GeneratedConfig() + + logger.Info().Msg("starting server->client reconfiguration") + + // Check if embedded etcd was ever initialized + embedded := NewEmbeddedEtcd(cfg, logger) + initialized, err := embedded.IsInitialized() + if err != nil { + return nil, fmt.Errorf("failed to check embedded etcd initialization during server->client transition: %w", err) + } + + // If etcd was never initialized, there's nothing to demote – just persist + // the new mode and come up as a client. + if !initialized { + logger.Info().Msg("embedded etcd not initialized, skipping server->client demotion") + + generated.EtcdMode = appCfg.EtcdMode + if err := cfg.UpdateGeneratedConfig(generated); err != nil { + return nil, fmt.Errorf("failed to update generated config for server->client (uninitialized) transition: %w", err) + } + + return NewRemoteEtcd(cfg, logger), nil + } + + // Connect to cluster using existing credentials + logger.Info().Msg("getting cluster member list to find remote endpoints") + + // Create a temporary client connection using the existing server credentials + localClientURLs := []string{fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.ClientPort)} + + clientCfg, err := clientConfig(appCfg, logger, localClientURLs...) + if err != nil { + return nil, fmt.Errorf("failed to create client config for server->client transition: %w", err) + } + + client, err := clientv3.New(clientCfg) + if err != nil { + return nil, fmt.Errorf("failed to connect to local etcd for server->client transition: %w", err) + } + defer client.Close() + + // Get the full member list before removing this host + resp, err := client.MemberList(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list etcd members for server->client transition: %w", err) + } + + var endpoints []string + for _, m := range resp.Members { + // Skip this host's member; we are about to remove it. + if m.Name == appCfg.HostID { + continue + } + endpoints = append(endpoints, m.ClientURLs...) + } + + if len(endpoints) == 0 { + return nil, fmt.Errorf("cannot demote etcd server on host %s: no remaining cluster members with client URLs", appCfg.HostID) + } + + // Remove this host's etcd member from the cluster + logger.Info().Msg("removing this host from etcd cluster") + if err := RemoveMember(ctx, client, appCfg.HostID); err != nil { + logger.Warn().Err(err).Msg("failed to remove this host via direct etcd API") + // Continue anyway - when we shut down the etcd server, the cluster will detect it + logger.Warn().Msg("continuing with server->client transition - cluster will detect member loss") + } + + // Remove the etcd data directory since we're no longer a server + etcdDataDir := filepath.Join(appCfg.DataDir, "etcd") + logger.Info(). + Str("etcd_data_dir", etcdDataDir). + Msg("removing etcd data directory for server->client transition") + + if err := os.RemoveAll(etcdDataDir); err != nil { + logger.Warn(). + Err(err). + Str("etcd_data_dir", etcdDataDir). + Msg("failed to remove etcd data directory - continuing anyway") + } + + // Persist new mode + remote endpoints + generated.EtcdMode = appCfg.EtcdMode + generated.EtcdClient.Endpoints = endpoints + if err := cfg.UpdateGeneratedConfig(generated); err != nil { + return nil, fmt.Errorf("failed to update generated config after server->client transition: %w", err) + } + + logger.Info(). + Strs("endpoints", endpoints). + Msg("completed etcd server->client transition; using remaining cluster members as remote endpoints") + + // Return a new RemoteEtcd client for the demoted host + return NewRemoteEtcd(cfg, logger), nil +} + +// reconfigureClientToServer handles the transition from client mode to server mode. +// It joins the host to the etcd cluster as an embedded server member. +func reconfigureClientToServer( + ctx context.Context, + cfg *config.Manager, + logger zerolog.Logger, +) (Etcd, error) { + appCfg := cfg.Config() + + logger.Info(). + Str("host_id", appCfg.HostID). + Msg("starting client->server reconfiguration") + + // Connect to the existing cluster as a remote client using persisted credentials. + remote := NewRemoteEtcd(cfg, logger) + + logger.Info().Msg("starting remote etcd client with existing credentials") + if err := remote.Start(ctx); err != nil { + // Authentication failed - credentials are invalid + logger.Error(). + Err(err). + Msg("failed to authenticate with existing credentials - cannot automatically recover without valid credentials") + + // Clear the invalid config + if clearErr := clearGeneratedConfig(cfg, logger); clearErr != nil { + logger.Error().Err(clearErr).Msg("failed to clear generated config") + } + + // Return to pre-init mode - manual intervention required + logger.Info().Msg("returning to pre-initialization mode - use JoinCluster API to rejoin") + return NewRemoteEtcd(cfg, logger), nil + } + + logger.Info().Msg("remote etcd client started, querying cluster information") + + // Get the etcd client for cluster queries + client, err := remote.GetClient() + if err != nil { + logger.Error().Err(err).Msg("failed to get etcd client") + if clearErr := clearGeneratedConfig(cfg, logger); clearErr != nil { + logger.Error().Err(clearErr).Msg("failed to clear generated config") + } + return NewRemoteEtcd(cfg, logger), nil + } + + // Collect HTTP endpoints for potential automatic rejoin + httpEndpoints := collectHTTPEndpointsFromHosts(ctx, client, appCfg.HostID, logger) + + logger.Info().Msg("requesting server credentials via AddHost") + + // Request server credentials from the cluster + creds, err := remote.AddHost(ctx, HostCredentialOptions{ + HostID: appCfg.HostID, + Hostname: appCfg.Hostname, + IPv4Address: appCfg.IPv4Address, + EmbeddedEtcdEnabled: true, + }) + if err != nil { + // AddHost failed - attempt automatic rejoin via HTTP + logger.Warn(). + Err(err). + Msg("failed to create server credentials - attempting automatic rejoin via HTTP") + + // Shutdown the remote client + if shutdownErr := remote.Shutdown(); shutdownErr != nil { + logger.Warn().Err(shutdownErr).Msg("failed to shutdown remote client during credential failure") + } + + // Attempt automatic rejoin + embedded, rejoinErr := attemptAutomaticRejoin(ctx, httpEndpoints, cfg, logger) + if rejoinErr != nil { + logger.Error(). + Err(rejoinErr). + Msg("automatic rejoin failed - clearing config and returning to pre-init mode") + + if clearErr := clearGeneratedConfig(cfg, logger); clearErr != nil { + logger.Error().Err(clearErr).Msg("failed to clear generated config") + } + + logger.Info().Msg("returning to pre-initialization mode - use JoinCluster API with valid join token to rejoin") + return NewRemoteEtcd(cfg, logger), nil + } + + logger.Info().Msg("automatic rejoin successful - joined cluster as embedded etcd server") + return embedded, nil + } + + logger.Info().Msg("server credentials obtained, discovering cluster leader") + + // Get the current cluster leader + leader, err := remote.Leader(ctx) + if err != nil { + _ = remote.Shutdown() + return nil, fmt.Errorf("failed to discover etcd leader for client->server transition: %w", err) + } + + logger.Info(). + Str("leader", leader.Name). + Msg("leader discovered, joining as embedded etcd server") + + // Join the cluster as an embedded server + embedded := NewEmbeddedEtcd(cfg, logger) + if err := embedded.Join(ctx, JoinOptions{ + Leader: leader, + Credentials: creds, + }); err != nil { + _ = remote.Shutdown() + return nil, fmt.Errorf("failed to join etcd cluster as embedded server during client->server transition: %w", err) + } + + // Shutdown the remote client - we're now running as an embedded server + if err := remote.Shutdown(); err != nil { + logger.Warn().Err(err).Msg("failed to shutdown temporary remote etcd after client->server transition") + } + + logger.Info().Msg("completed etcd client->server transition; embedded etcd has joined the cluster") + + return embedded, nil +} + +// clearGeneratedConfig clears the generated configuration file, resetting the host +// to a pre-initialized state. +func clearGeneratedConfig(cfg *config.Manager, logger zerolog.Logger) error { + appCfg := cfg.Config() + desiredMode := appCfg.EtcdMode + + emptyConfig := config.Config{ + EtcdMode: desiredMode, + EtcdClient: config.EtcdClient{ + Endpoints: nil, + }, + EtcdUsername: "", + EtcdPassword: "", + } + + logger.Info(). + Str("desired_mode", string(desiredMode)). + Msg("clearing generated config but preserving desired etcd mode") + + if err := cfg.UpdateGeneratedConfig(emptyConfig); err != nil { + return fmt.Errorf("failed to update generated config: %w", err) + } + + logger.Info().Msg("generated config cleared - host is now in pre-initialized state") + return nil +} + +// collectHTTPEndpointsFromHosts queries the etcd cluster for host information +// and constructs HTTP endpoints from stored host data. +func collectHTTPEndpointsFromHosts(ctx context.Context, client *clientv3.Client, thisHostID string, logger zerolog.Logger) []string { + hostsResp, err := client.Get(ctx, "/hosts/", clientv3.WithPrefix()) + if err != nil { + logger.Warn().Err(err).Msg("failed to query hosts for HTTP endpoints") + return nil + } + + if hostsResp.Count == 0 { + logger.Warn().Msg("no hosts found in cluster for HTTP endpoint discovery") + return nil + } + + var httpEndpoints []string + for _, kv := range hostsResp.Kvs { + hostData, err := decompressData(kv.Value) + if err != nil { + logger.Warn().Err(err).Str("key", string(kv.Key)).Msg("failed to decompress host data") + continue + } + + // Extract host ID from key + key := string(kv.Key) + parts := strings.Split(key, "/") + if len(parts) < 3 { + continue + } + hostID := parts[2] + + // Skip this host + if hostID == thisHostID { + continue + } + + // Parse host data + var hostInfo struct { + IPv4Address string `json:"ipv4_address"` + HTTPPort int `json:"http_port"` + } + + if err := json.Unmarshal(hostData, &hostInfo); err != nil { + logger.Warn().Err(err).Str("host_id", hostID).Msg("failed to parse host data") + continue + } + + // Construct HTTP endpoint + if hostInfo.IPv4Address != "" && hostInfo.HTTPPort > 0 { + httpEndpoint := fmt.Sprintf("http://%s:%d", hostInfo.IPv4Address, hostInfo.HTTPPort) + httpEndpoints = append(httpEndpoints, httpEndpoint) + logger.Info(). + Str("host_id", hostID). + Str("http_endpoint", httpEndpoint). + Msg("constructed HTTP endpoint from host data") + } + } + + return httpEndpoints +} + +// decompressData handles both compressed and uncompressed data. +func decompressData(in []byte) ([]byte, error) { + r, err := gzip.NewReader(bytes.NewReader(in)) + if errors.Is(err, gzip.ErrHeader) { + // Not compressed + return in, nil + } else if err != nil { + return nil, fmt.Errorf("failed to initialize gzip reader: %w", err) + } + defer r.Close() + + out, err := io.ReadAll(r) + if err != nil { + return nil, fmt.Errorf("failed to decompress data: %w", err) + } + return out, nil +} + +// attemptAutomaticRejoin tries to rejoin the cluster automatically using HTTP endpoints. +// It iterates through available HTTP endpoints, requests a join token, obtains credentials, +// and joins the etcd cluster as an embedded server. +func attemptAutomaticRejoin( + ctx context.Context, + httpEndpoints []string, + cfg *config.Manager, + logger zerolog.Logger, +) (Etcd, error) { + if len(httpEndpoints) == 0 { + return nil, fmt.Errorf("no HTTP endpoints available for automatic rejoin") + } + + appCfg := cfg.Config() + + logger.Info(). + Str("host_id", appCfg.HostID). + Int("endpoint_count", len(httpEndpoints)). + Msg("attempting automatic rejoin via HTTP endpoints") + + // Try each HTTP endpoint until one succeeds + var lastErr error + for _, httpEndpoint := range httpEndpoints { + logger.Info(). + Str("endpoint", httpEndpoint). + Msg("trying to get credentials from cluster member") + + // Parse the HTTP endpoint + serverURL, err := url.Parse(httpEndpoint) + if err != nil { + lastErr = fmt.Errorf("failed to parse HTTP endpoint: %w", err) + continue + } + + // Create HTTP client and API wrapper + httpClient := &http.Client{Timeout: 30 * time.Second} + apiClient := CreateAPIClient(serverURL, httpClient) + + // Get join token from the cluster member + joinToken, err := apiClient.GetJoinToken(ctx) + if err != nil { + lastErr = fmt.Errorf("failed to get join token: %w", err) + logger.Warn().Err(err).Str("endpoint", httpEndpoint).Msg("failed to get join token from endpoint") + httpClient.CloseIdleConnections() + continue + } + + // Get join options with credentials for server mode + joinOpts, err := apiClient.GetJoinOptions(ctx, &api.ClusterJoinRequest{ + HostID: api.Identifier(appCfg.HostID), + Hostname: appCfg.Hostname, + Ipv4Address: appCfg.IPv4Address, + Token: joinToken.Token, + EmbeddedEtcdEnabled: true, // server mode + }) + if err != nil { + lastErr = fmt.Errorf("failed to get join options: %w", err) + logger.Warn().Err(err).Str("endpoint", httpEndpoint).Msg("failed to get join options from endpoint") + httpClient.CloseIdleConnections() + continue + } + + // Successfully got credentials - decode and validate them + etcdJoinOpts, err := DecodeJoinCredentials(joinOpts) + if err != nil { + lastErr = fmt.Errorf("failed to decode credentials: %w", err) + httpClient.CloseIdleConnections() + continue + } + + embedded := NewEmbeddedEtcd(cfg, logger) + if err := embedded.Join(ctx, *etcdJoinOpts); err != nil { + httpClient.CloseIdleConnections() + return nil, fmt.Errorf("failed to join cluster: %w", err) + } + httpClient.CloseIdleConnections() + logger.Info().Msg("successfully joined cluster as embedded etcd via automatic rejoin") + return embedded, nil + } + + return nil, fmt.Errorf("failed to rejoin via all endpoints: %w", lastErr) +} diff --git a/server/internal/host/host.go b/server/internal/host/host.go index 7c545f45..e2c50bf0 100644 --- a/server/internal/host/host.go +++ b/server/internal/host/host.go @@ -42,6 +42,7 @@ type Host struct { DataDir string Hostname string IPv4Address string + HTTPPort int CPUs int MemBytes uint64 Status *HostStatus @@ -93,6 +94,7 @@ func fromStorage(host *StoredHost, status *StoredHostStatus) (*Host, error) { DataDir: host.DataDir, Hostname: host.Hostname, IPv4Address: host.IPv4Address, + HTTPPort: host.HTTPPort, CPUs: host.CPUs, MemBytes: host.MemBytes, SupportedPgEdgeVersions: host.SupportedPgEdgeVersions, @@ -141,6 +143,7 @@ func toStorage(host *Host) *StoredHost { DataDir: host.DataDir, Hostname: host.Hostname, IPv4Address: host.IPv4Address, + HTTPPort: host.HTTPPort, CPUs: host.CPUs, MemBytes: host.MemBytes, DefaultPgEdgeVersion: host.DefaultPgEdgeVersion, diff --git a/server/internal/host/host_store.go b/server/internal/host/host_store.go index ae0a66fc..29931a03 100644 --- a/server/internal/host/host_store.go +++ b/server/internal/host/host_store.go @@ -21,6 +21,7 @@ type StoredHost struct { DataDir string `json:"data_dir"` Hostname string `json:"hostname"` IPv4Address string `json:"ipv4_address"` + HTTPPort int `json:"http_port"` CPUs int `json:"cpus"` MemBytes uint64 `json:"mem_bytes"` DefaultPgEdgeVersion *PgEdgeVersion `json:"default_version"` diff --git a/server/internal/host/service.go b/server/internal/host/service.go index 29454e0a..ae2ee940 100644 --- a/server/internal/host/service.go +++ b/server/internal/host/service.go @@ -44,6 +44,7 @@ func (s *Service) UpdateHost(ctx context.Context) error { DataDir: s.cfg.DataDir, Hostname: s.cfg.Hostname, IPv4Address: s.cfg.IPv4Address, + HTTPPort: s.cfg.HTTP.Port, // CPUs: resources.CPUs, // MemBytes: resources.MemBytes, // UpdatedAt: time.Now(),