Skip to content

Commit 21607be

Browse files
committed
[INLONG-12058][SDK] Support retry mechanism when "server error" occurs in dataproxy go sdk
1 parent 0bb45cc commit 21607be

File tree

3 files changed

+89
-21
lines changed

3 files changed

+89
-21
lines changed

inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ type Options struct {
7777
addColumnStr string // the string format of the AddColumns, just a cache, used internal
7878
Auth Auth // dataproxy authentication interface
7979
MaxConnLifetime time.Duration // connection max lifetime, default: 0, set to 5m/10m when the servers provide service though CLBs (Cloud Load Balancers)
80+
RetryOnServerError bool // whether to retry on server error, default: false
81+
RetryInitialInterval time.Duration // initial retry interval for exponential backoff, default: 100ms
8082
}
8183

8284
// ValidateAndSetDefault validates an options and set up the default values
@@ -190,5 +192,9 @@ func (options *Options) ValidateAndSetDefault() error {
190192
}
191193
options.addColumnStr = sb.String()
192194

195+
if options.RetryInitialInterval <= 0 {
196+
options.RetryInitialInterval = 100 * time.Millisecond
197+
}
198+
193199
return nil
194200
}

inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,20 @@ func WithMaxConnLifetime(lifetime time.Duration) Option {
189189
o.MaxConnLifetime = lifetime
190190
}
191191
}
192+
193+
// WithRetryOnServerError sets RetryOnServerError
194+
func WithRetryOnServerError(retry bool) Option {
195+
return func(o *Options) {
196+
o.RetryOnServerError = retry
197+
}
198+
}
199+
200+
// WithRetryInitialInterval sets RetryInitialInterval
201+
func WithRetryInitialInterval(interval time.Duration) Option {
202+
return func(o *Options) {
203+
if interval <= 0 {
204+
return
205+
}
206+
o.RetryInitialInterval = interval
207+
}
208+
}

inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go

Lines changed: 66 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ const (
4141
defaultMapCleanThreshold = 500000
4242
)
4343

44+
// retryableServerErrorCodes defines which server error codes should trigger retry
45+
// and whether connection switch is needed.
46+
var retryableServerErrorCodes = map[int]bool{
47+
2: true, // SERVICE_CLOSED
48+
}
49+
4450
type workerState int32
4551

4652
const (
@@ -52,35 +58,50 @@ const (
5258
)
5359

5460
var (
55-
errOK = &errNo{code: 0, strCode: "0", message: "OK"}
56-
errSendTimeout = &errNo{code: 10001, strCode: "10001", message: "message send timeout"}
57-
errSendFailed = &errNo{code: 10002, strCode: "10002", message: "message send failed"} //nolint:unused
58-
errProducerClosed = &errNo{code: 10003, strCode: "10003", message: "producer already been closed"}
59-
errSendQueueIsFull = &errNo{code: 10004, strCode: "10004", message: "producer send queue is full"}
60-
errContextExpired = &errNo{code: 10005, strCode: "10005", message: "message context expired"}
61-
errNewConnFailed = &errNo{code: 10006, strCode: "10006", message: "new conn failed"}
62-
errConnWriteFailed = &errNo{code: 10007, strCode: "10007", message: "conn write failed"}
63-
errConnReadFailed = &errNo{code: 10008, strCode: "10008", message: "conn read failed"}
64-
errLogTooLong = &errNo{code: 10009, strCode: "10009", message: "input log is too long"} //nolint:unused
65-
errBadLog = &errNo{code: 10010, strCode: "10010", message: "input log is invalid"}
66-
errServerError = &errNo{code: 10011, strCode: "10011", message: "server error"} //nolint:unused
67-
errServerPanic = &errNo{code: 10012, strCode: "10012", message: "server panic"}
68-
workerBusy = &errNo{code: 10013, strCode: "10013", message: "worker is busy"}
69-
errNoMatchReq4Rsp = &errNo{code: 10014, strCode: "10014", message: "no match unacknowledged request for response"}
70-
errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message: "conn closed by peer"}
71-
errUnknown = &errNo{code: 20001, strCode: "20001", message: "unknown"}
61+
errOK = &errNo{code: 0, strCode: "0", message: "OK", serverErrCode: -1}
62+
errSendTimeout = &errNo{code: 10001, strCode: "10001", message: "message send timeout", serverErrCode: -1}
63+
errSendFailed = &errNo{code: 10002, strCode: "10002", message: "message send failed", serverErrCode: -1} //nolint:unused
64+
errProducerClosed = &errNo{code: 10003, strCode: "10003", message: "producer already been closed", serverErrCode: -1}
65+
errSendQueueIsFull = &errNo{code: 10004, strCode: "10004", message: "producer send queue is full", serverErrCode: -1}
66+
errContextExpired = &errNo{code: 10005, strCode: "10005", message: "message context expired", serverErrCode: -1}
67+
errNewConnFailed = &errNo{code: 10006, strCode: "10006", message: "new conn failed", serverErrCode: -1}
68+
errConnWriteFailed = &errNo{code: 10007, strCode: "10007", message: "conn write failed", serverErrCode: -1}
69+
errConnReadFailed = &errNo{code: 10008, strCode: "10008", message: "conn read failed", serverErrCode: -1}
70+
errLogTooLong = &errNo{code: 10009, strCode: "10009", message: "input log is too long", serverErrCode: -1} //nolint:unused
71+
errBadLog = &errNo{code: 10010, strCode: "10010", message: "input log is invalid", serverErrCode: -1}
72+
errServerError = &errNo{code: 10011, strCode: "10011", message: "server error", serverErrCode: -1} //nolint:unused
73+
errServerPanic = &errNo{code: 10012, strCode: "10012", message: "server panic", serverErrCode: -1}
74+
workerBusy = &errNo{code: 10013, strCode: "10013", message: "worker is busy", serverErrCode: -1}
75+
errNoMatchReq4Rsp = &errNo{code: 10014, strCode: "10014", message: "no match unacknowledged request for response", serverErrCode: -1}
76+
errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message: "conn closed by peer", serverErrCode: -1}
77+
errUnknown = &errNo{code: 20001, strCode: "20001", message: "unknown", serverErrCode: -1}
7278
)
7379

7480
type errNo struct {
75-
code int
76-
strCode string
77-
message string
81+
code int
82+
strCode string
83+
message string
84+
serverErrCode int // server error code from server response, -1 means not a server error
7885
}
7986

8087
func (e *errNo) Error() string {
8188
return e.message
8289
}
8390

91+
// GetServerErrorCode extracts server error code from error.
92+
func GetServerErrorCode(err error) int {
93+
if err == nil {
94+
return 0
95+
}
96+
var t *errNo
97+
switch {
98+
case errors.As(err, &t):
99+
return t.getServerErrCode()
100+
default:
101+
return -1
102+
}
103+
}
104+
84105
//nolint:unused
85106
func (e *errNo) getCode() int {
86107
return e.code
@@ -90,6 +111,10 @@ func (e *errNo) getStrCode() string {
90111
return e.strCode
91112
}
92113

114+
func (e *errNo) getServerErrCode() int {
115+
return e.serverErrCode
116+
}
117+
93118
func getErrorCode(err error) string {
94119
if err == nil {
95120
return errOK.getStrCode()
@@ -492,7 +517,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) {
492517

493518
// use ExponentialBackoff
494519
backoff := util.ExponentialBackoff{
495-
InitialInterval: 100 * time.Millisecond,
520+
InitialInterval: w.options.RetryInitialInterval,
496521
MaxInterval: 10 * time.Second,
497522
Multiplier: 2.0,
498523
Randomization: 0.2,
@@ -622,6 +647,25 @@ func (w *worker) handleRsp(rsp *batchRsp) {
622647
// call batch.done to release the resources it holds
623648
var err = error(nil)
624649
if rsp.errCode != 0 {
650+
// Check if connection switch is needed
651+
needSwitchConn, isRetryable := retryableServerErrorCodes[rsp.errCode]
652+
if needSwitchConn && w.client != nil {
653+
w.log.Warn("server error detected, switching connection, errCode:", rsp.errCode,
654+
", batchID:", batch.batchID)
655+
w.updateConn(nil, nil)
656+
}
657+
658+
// Check if retry is needed
659+
if w.options.RetryOnServerError && isRetryable && batch.retries < w.options.MaxRetries {
660+
delete(w.unackedBatches, batchID)
661+
662+
w.log.Warn("server error, will retry, errCode:", rsp.errCode,
663+
", batchID:", batch.batchID, ", retries:", batch.retries)
664+
665+
w.backoffRetry(context.Background(), batch)
666+
return
667+
}
668+
625669
err = &errNo{
626670
code: 10011,
627671
strCode: "10011",
@@ -631,6 +675,7 @@ func (w *worker) handleRsp(rsp *batchRsp) {
631675
", groupID=" + rsp.groupID +
632676
", streamID=" + rsp.streamID +
633677
", dt=" + rsp.dt,
678+
serverErrCode: rsp.errCode,
634679
}
635680
w.log.Error("send succeed but got error code:", rsp.errCode)
636681
}

0 commit comments

Comments
 (0)