Skip to content

Commit 9427c33

Browse files
committed
Optimize code to ensure consistency with documentation
Signed-off-by: zane-neo <zaniu@amazon.com>
1 parent 094b7e4 commit 9427c33

File tree

10 files changed

+73
-96
lines changed

10 files changed

+73
-96
lines changed

common/src/main/java/org/opensearch/ml/common/connector/ConnectorClientConfig.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@ public class ConnectorClientConfig implements ToXContentObject, Writeable {
3636
public static final String RETRY_BACKOFF_POLICY_FIELD = "retry_backoff_policy";
3737

3838
public static final Integer MAX_CONNECTION_DEFAULT_VALUE = 30;
39-
public static final Integer CONNECTION_TIMEOUT_DEFAULT_VALUE = 1;
39+
public static final Integer CONNECTION_TIMEOUT_DEFAULT_VALUE = 1000;
4040
public static final Integer READ_TIMEOUT_DEFAULT_VALUE = 10;
4141
public static final Integer RETRY_BACKOFF_MILLIS_DEFAULT_VALUE = 200;
4242
public static final Integer RETRY_TIMEOUT_SECONDS_DEFAULT_VALUE = 30;
4343
public static final Integer MAX_RETRY_TIMES_DEFAULT_VALUE = 0;
4444
public static final RetryBackoffPolicy RETRY_BACKOFF_POLICY_DEFAULT_VALUE = RetryBackoffPolicy.CONSTANT;
4545
public static final Version MINIMAL_SUPPORTED_VERSION_FOR_RETRY = Version.V_2_15_0;
4646
private Integer maxConnections;
47-
private Integer connectionTimeout;
48-
private Integer readTimeout;
47+
private Integer connectionTimeoutMillis;
48+
private Integer readTimeoutSeconds;
4949
private Integer retryBackoffMillis;
5050
private Integer retryTimeoutSeconds;
5151
private Integer maxRetryTimes;
@@ -54,16 +54,16 @@ public class ConnectorClientConfig implements ToXContentObject, Writeable {
5454
@Builder(toBuilder = true)
5555
public ConnectorClientConfig(
5656
Integer maxConnections,
57-
Integer connectionTimeout,
58-
Integer readTimeout,
57+
Integer connectionTimeoutMillis,
58+
Integer readTimeoutSeconds,
5959
Integer retryBackoffMillis,
6060
Integer retryTimeoutSeconds,
6161
Integer maxRetryTimes,
6262
RetryBackoffPolicy retryBackoffPolicy
6363
) {
6464
this.maxConnections = maxConnections;
65-
this.connectionTimeout = connectionTimeout;
66-
this.readTimeout = readTimeout;
65+
this.connectionTimeoutMillis = connectionTimeoutMillis;
66+
this.readTimeoutSeconds = readTimeoutSeconds;
6767
this.retryBackoffMillis = retryBackoffMillis;
6868
this.retryTimeoutSeconds = retryTimeoutSeconds;
6969
this.maxRetryTimes = maxRetryTimes;
@@ -73,8 +73,8 @@ public ConnectorClientConfig(
7373
public ConnectorClientConfig(StreamInput input) throws IOException {
7474
Version streamInputVersion = input.getVersion();
7575
this.maxConnections = input.readOptionalInt();
76-
this.connectionTimeout = input.readOptionalInt();
77-
this.readTimeout = input.readOptionalInt();
76+
this.connectionTimeoutMillis = input.readOptionalInt();
77+
this.readTimeoutSeconds = input.readOptionalInt();
7878
if (streamInputVersion.onOrAfter(MINIMAL_SUPPORTED_VERSION_FOR_RETRY)) {
7979
this.retryBackoffMillis = input.readOptionalInt();
8080
this.retryTimeoutSeconds = input.readOptionalInt();
@@ -87,8 +87,8 @@ public ConnectorClientConfig(StreamInput input) throws IOException {
8787

8888
public ConnectorClientConfig() {
8989
this.maxConnections = MAX_CONNECTION_DEFAULT_VALUE;
90-
this.connectionTimeout = CONNECTION_TIMEOUT_DEFAULT_VALUE;
91-
this.readTimeout = READ_TIMEOUT_DEFAULT_VALUE;
90+
this.connectionTimeoutMillis = CONNECTION_TIMEOUT_DEFAULT_VALUE;
91+
this.readTimeoutSeconds = READ_TIMEOUT_DEFAULT_VALUE;
9292
this.retryBackoffMillis = RETRY_BACKOFF_MILLIS_DEFAULT_VALUE;
9393
this.retryTimeoutSeconds = RETRY_TIMEOUT_SECONDS_DEFAULT_VALUE;
9494
this.maxRetryTimes = MAX_RETRY_TIMES_DEFAULT_VALUE;
@@ -99,8 +99,8 @@ public ConnectorClientConfig() {
9999
public void writeTo(StreamOutput out) throws IOException {
100100
Version streamOutputVersion = out.getVersion();
101101
out.writeOptionalInt(maxConnections);
102-
out.writeOptionalInt(connectionTimeout);
103-
out.writeOptionalInt(readTimeout);
102+
out.writeOptionalInt(connectionTimeoutMillis);
103+
out.writeOptionalInt(readTimeoutSeconds);
104104
if (streamOutputVersion.onOrAfter(MINIMAL_SUPPORTED_VERSION_FOR_RETRY)) {
105105
out.writeOptionalInt(retryBackoffMillis);
106106
out.writeOptionalInt(retryTimeoutSeconds);
@@ -120,11 +120,11 @@ public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params
120120
if (maxConnections != null) {
121121
builder.field(MAX_CONNECTION_FIELD, maxConnections);
122122
}
123-
if (connectionTimeout != null) {
124-
builder.field(CONNECTION_TIMEOUT_FIELD, connectionTimeout);
123+
if (connectionTimeoutMillis != null) {
124+
builder.field(CONNECTION_TIMEOUT_FIELD, connectionTimeoutMillis);
125125
}
126-
if (readTimeout != null) {
127-
builder.field(READ_TIMEOUT_FIELD, readTimeout);
126+
if (readTimeoutSeconds != null) {
127+
builder.field(READ_TIMEOUT_FIELD, readTimeoutSeconds);
128128
}
129129
if (retryBackoffMillis != null) {
130130
builder.field(RETRY_BACKOFF_MILLIS_FIELD, retryBackoffMillis);
@@ -190,8 +190,8 @@ public static ConnectorClientConfig parse(XContentParser parser) throws IOExcept
190190
return ConnectorClientConfig
191191
.builder()
192192
.maxConnections(maxConnections)
193-
.connectionTimeout(connectionTimeout)
194-
.readTimeout(readTimeout)
193+
.connectionTimeoutMillis(connectionTimeout)
194+
.readTimeoutSeconds(readTimeout)
195195
.retryBackoffMillis(retryBackoffMillis)
196196
.retryTimeoutSeconds(retryTimeoutSeconds)
197197
.maxRetryTimes(maxRetryTimes)

common/src/test/java/org/opensearch/ml/common/connector/ConnectorClientConfigTest.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public void writeTo_ReadFromStream() throws IOException {
2424
ConnectorClientConfig config = ConnectorClientConfig
2525
.builder()
2626
.maxConnections(10)
27-
.connectionTimeout(5000)
28-
.readTimeout(3000)
27+
.connectionTimeoutMillis(1000)
28+
.readTimeoutSeconds(3)
2929
.retryBackoffMillis(123)
3030
.retryTimeoutSeconds(456)
3131
.maxRetryTimes(789)
@@ -55,8 +55,8 @@ public void writeTo_ReadFromStream_diffVersionThenNotProcessRetryOptions() throw
5555
ConnectorClientConfig config = ConnectorClientConfig
5656
.builder()
5757
.maxConnections(10)
58-
.connectionTimeout(5000)
59-
.readTimeout(3000)
58+
.connectionTimeoutMillis(1000)
59+
.readTimeoutSeconds(3)
6060
.retryBackoffMillis(123)
6161
.retryTimeoutSeconds(456)
6262
.maxRetryTimes(789)
@@ -71,8 +71,8 @@ public void writeTo_ReadFromStream_diffVersionThenNotProcessRetryOptions() throw
7171
ConnectorClientConfig readConfig = ConnectorClientConfig.fromStream(input);
7272

7373
Assert.assertEquals(Integer.valueOf(10), readConfig.getMaxConnections());
74-
Assert.assertEquals(Integer.valueOf(5000), readConfig.getConnectionTimeout());
75-
Assert.assertEquals(Integer.valueOf(3000), readConfig.getReadTimeout());
74+
Assert.assertEquals(Integer.valueOf(1000), readConfig.getConnectionTimeoutMillis());
75+
Assert.assertEquals(Integer.valueOf(3), readConfig.getReadTimeoutSeconds());
7676
Assert.assertNull(readConfig.getRetryBackoffMillis());
7777
Assert.assertNull(readConfig.getRetryTimeoutSeconds());
7878
Assert.assertNull(readConfig.getMaxRetryTimes());
@@ -84,8 +84,8 @@ public void toXContent() throws IOException {
8484
ConnectorClientConfig config = ConnectorClientConfig
8585
.builder()
8686
.maxConnections(10)
87-
.connectionTimeout(5000)
88-
.readTimeout(3000)
87+
.connectionTimeoutMillis(1000)
88+
.readTimeoutSeconds(3)
8989
.retryBackoffMillis(123)
9090
.retryTimeoutSeconds(456)
9191
.maxRetryTimes(789)
@@ -96,14 +96,14 @@ public void toXContent() throws IOException {
9696
config.toXContent(builder, ToXContent.EMPTY_PARAMS);
9797
String content = TestHelper.xContentBuilderToString(builder);
9898

99-
String expectedJson = "{\"max_connection\":10,\"connection_timeout\":5000,\"read_timeout\":3000,"
99+
String expectedJson = "{\"max_connection\":10,\"connection_timeout\":1000,\"read_timeout\":3,"
100100
+ "\"retry_backoff_millis\":123,\"retry_timeout_seconds\":456,\"max_retry_times\":789,\"retry_backoff_policy\":\"constant\"}";
101101
Assert.assertEquals(expectedJson, content);
102102
}
103103

104104
@Test
105105
public void parse() throws IOException {
106-
String jsonStr = "{\"max_connection\":10,\"connection_timeout\":5000,\"read_timeout\":3000,"
106+
String jsonStr = "{\"max_connection\":10,\"connection_timeout\":5000,\"read_timeout\":3,"
107107
+ "\"retry_backoff_millis\":123,\"retry_timeout_seconds\":456,\"max_retry_times\":789,\"retry_backoff_policy\":\"constant\"}";
108108
XContentParser parser = XContentType.JSON
109109
.xContent()
@@ -117,8 +117,8 @@ public void parse() throws IOException {
117117
ConnectorClientConfig config = ConnectorClientConfig.parse(parser);
118118

119119
Assert.assertEquals(Integer.valueOf(10), config.getMaxConnections());
120-
Assert.assertEquals(Integer.valueOf(5000), config.getConnectionTimeout());
121-
Assert.assertEquals(Integer.valueOf(3000), config.getReadTimeout());
120+
Assert.assertEquals(Integer.valueOf(5000), config.getConnectionTimeoutMillis());
121+
Assert.assertEquals(Integer.valueOf(3), config.getReadTimeoutSeconds());
122122
Assert.assertEquals(Integer.valueOf(123), config.getRetryBackoffMillis());
123123
Assert.assertEquals(Integer.valueOf(456), config.getRetryTimeoutSeconds());
124124
Assert.assertEquals(Integer.valueOf(789), config.getMaxRetryTimes());
@@ -127,7 +127,7 @@ public void parse() throws IOException {
127127

128128
@Test
129129
public void parse_whenMalformedBackoffPolicy_thenFail() throws IOException {
130-
String jsonStr = "{\"max_connection\":10,\"connection_timeout\":5000,\"read_timeout\":3000,"
130+
String jsonStr = "{\"max_connection\":10,\"connection_timeout\":5000,\"read_timeout\":3,"
131131
+ "\"retry_backoff_millis\":123,\"retry_timeout_seconds\":456,\"max_retry_times\":789,\"retry_backoff_policy\":\"test\"}";
132132
XContentParser parser = XContentType.JSON
133133
.xContent()
@@ -147,8 +147,8 @@ public void testDefaultValues() {
147147
ConnectorClientConfig config = ConnectorClientConfig.builder().build();
148148

149149
Assert.assertNull(config.getMaxConnections());
150-
Assert.assertNull(config.getConnectionTimeout());
151-
Assert.assertNull(config.getReadTimeout());
150+
Assert.assertNull(config.getConnectionTimeoutMillis());
151+
Assert.assertNull(config.getReadTimeoutSeconds());
152152
Assert.assertNull(config.getRetryBackoffMillis());
153153
Assert.assertNull(config.getRetryTimeoutSeconds());
154154
Assert.assertNull(config.getMaxRetryTimes());
@@ -160,8 +160,8 @@ public void testDefaultValuesInitByNewInstance() {
160160
ConnectorClientConfig config = new ConnectorClientConfig();
161161

162162
Assert.assertEquals(Integer.valueOf(30), config.getMaxConnections());
163-
Assert.assertEquals(Integer.valueOf(1), config.getConnectionTimeout());
164-
Assert.assertEquals(Integer.valueOf(10), config.getReadTimeout());
163+
Assert.assertEquals(Integer.valueOf(1000), config.getConnectionTimeoutMillis());
164+
Assert.assertEquals(Integer.valueOf(10), config.getReadTimeoutSeconds());
165165
Assert.assertEquals(Integer.valueOf(200), config.getRetryBackoffMillis());
166166
Assert.assertEquals(Integer.valueOf(30), config.getRetryTimeoutSeconds());
167167
Assert.assertEquals(Integer.valueOf(0), config.getMaxRetryTimes());

common/src/test/java/org/opensearch/ml/common/transport/connector/MLCreateConnectorInputTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,8 @@ public void testParse() throws Exception {
324324
testParseFromJsonString(expectedInputStr, parsedInput -> {
325325
assertEquals(TEST_CONNECTOR_NAME, parsedInput.getName());
326326
assertEquals(20, parsedInput.getConnectorClientConfig().getMaxConnections().intValue());
327-
assertEquals(10000, parsedInput.getConnectorClientConfig().getReadTimeout().intValue());
328-
assertEquals(10000, parsedInput.getConnectorClientConfig().getConnectionTimeout().intValue());
327+
assertEquals(10000, parsedInput.getConnectorClientConfig().getReadTimeoutSeconds().intValue());
328+
assertEquals(10000, parsedInput.getConnectorClientConfig().getConnectionTimeoutMillis().intValue());
329329
});
330330
}
331331

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/AbstractConnectorExecutor.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,26 @@
55

66
package org.opensearch.ml.engine.algorithms.remote;
77

8+
import java.time.Duration;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
811
import org.opensearch.ml.common.connector.Connector;
912
import org.opensearch.ml.common.connector.ConnectorClientConfig;
13+
import org.opensearch.ml.common.httpclient.MLHttpClientFactory;
1014

1115
import lombok.Getter;
1216
import lombok.Setter;
17+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
1318

1419
@Setter
1520
@Getter
1621
public abstract class AbstractConnectorExecutor implements RemoteConnectorExecutor {
22+
23+
@Setter
24+
private volatile boolean connectorPrivateIpEnabled;
25+
26+
private final AtomicReference<SdkAsyncHttpClient> httpClientRef = new AtomicReference<>();
27+
1728
private ConnectorClientConfig connectorClientConfig;
1829

1930
public void initialize(Connector connector) {
@@ -23,4 +34,18 @@ public void initialize(Connector connector) {
2334
connectorClientConfig = new ConnectorClientConfig();
2435
}
2536
}
37+
38+
protected SdkAsyncHttpClient getHttpClient() {
39+
if (httpClientRef.get() == null) {
40+
Duration connectionTimeout = Duration.ofMillis(connectorClientConfig.getConnectionTimeoutMillis());
41+
Duration readTimeout = Duration.ofSeconds(connectorClientConfig.getReadTimeoutSeconds());
42+
Integer maxConnection = connectorClientConfig.getMaxConnections();
43+
this.httpClientRef
44+
.compareAndSet(
45+
null,
46+
MLHttpClientFactory.getAsyncHttpClient(connectionTimeout, readTimeout, maxConnection, connectorPrivateIpEnabled)
47+
);
48+
}
49+
return httpClientRef.get();
50+
}
2651
}

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/AwsConnectorExecutor.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,9 @@
1414

1515
import java.security.AccessController;
1616
import java.security.PrivilegedExceptionAction;
17-
import java.time.Duration;
1817
import java.util.Locale;
1918
import java.util.Map;
2019
import java.util.concurrent.CompletableFuture;
21-
import java.util.concurrent.atomic.AtomicReference;
2220

2321
import org.apache.commons.text.StringEscapeUtils;
2422
import org.apache.logging.log4j.Logger;
@@ -28,7 +26,6 @@
2826
import org.opensearch.ml.common.connector.AwsConnector;
2927
import org.opensearch.ml.common.connector.Connector;
3028
import org.opensearch.ml.common.exception.MLException;
31-
import org.opensearch.ml.common.httpclient.MLHttpClientFactory;
3229
import org.opensearch.ml.common.input.MLInput;
3330
import org.opensearch.ml.common.model.MLGuard;
3431
import org.opensearch.ml.common.output.model.ModelTensors;
@@ -41,15 +38,12 @@
4138
import org.opensearch.transport.StreamTransportService;
4239
import org.opensearch.transport.client.Client;
4340

44-
import com.google.common.annotations.VisibleForTesting;
45-
4641
import lombok.Getter;
4742
import lombok.Setter;
4843
import lombok.extern.log4j.Log4j2;
4944
import software.amazon.awssdk.core.internal.http.async.SimpleHttpContentPublisher;
5045
import software.amazon.awssdk.http.SdkHttpFullRequest;
5146
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
52-
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
5347

5448
@Log4j2
5549
@ConnectorExecutor(AWS_SIGV4)
@@ -73,8 +67,6 @@ public class AwsConnectorExecutor extends AbstractConnectorExecutor {
7367
@Getter
7468
private MLGuard mlGuard;
7569

76-
private final AtomicReference<SdkAsyncHttpClient> httpClientRef = new AtomicReference<>();
77-
7870
@Setter
7971
@Getter
8072
private StreamTransportService streamTransportService;
@@ -183,19 +175,4 @@ private void validateLLMInterface(String llmInterface) {
183175
throw new IllegalArgumentException(String.format("Unsupported llm interface: %s", llmInterface));
184176
}
185177
}
186-
187-
@VisibleForTesting
188-
protected SdkAsyncHttpClient getHttpClient() {
189-
if (httpClientRef.get() == null) {
190-
Duration connectionTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getConnectionTimeout());
191-
Duration readTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getReadTimeout());
192-
Integer maxConnection = super.getConnectorClientConfig().getMaxConnections();
193-
this.httpClientRef
194-
.compareAndSet(
195-
null,
196-
MLHttpClientFactory.getAsyncHttpClient(connectionTimeout, readTimeout, maxConnection, connectorPrivateIpEnabled)
197-
);
198-
}
199-
return httpClientRef.get();
200-
}
201178
}

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/HttpJsonConnectorExecutor.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,9 @@
1313

1414
import java.security.AccessController;
1515
import java.security.PrivilegedExceptionAction;
16-
import java.time.Duration;
1716
import java.util.Locale;
1817
import java.util.Map;
1918
import java.util.concurrent.CompletableFuture;
20-
import java.util.concurrent.atomic.AtomicReference;
2119

2220
import org.apache.commons.text.StringEscapeUtils;
2321
import org.apache.logging.log4j.Logger;
@@ -27,7 +25,6 @@
2725
import org.opensearch.ml.common.connector.Connector;
2826
import org.opensearch.ml.common.connector.HttpConnector;
2927
import org.opensearch.ml.common.exception.MLException;
30-
import org.opensearch.ml.common.httpclient.MLHttpClientFactory;
3128
import org.opensearch.ml.common.input.MLInput;
3229
import org.opensearch.ml.common.model.MLGuard;
3330
import org.opensearch.ml.common.output.model.ModelTensors;
@@ -40,15 +37,12 @@
4037
import org.opensearch.transport.StreamTransportService;
4138
import org.opensearch.transport.client.Client;
4239

43-
import com.google.common.annotations.VisibleForTesting;
44-
4540
import lombok.Getter;
4641
import lombok.Setter;
4742
import lombok.extern.log4j.Log4j2;
4843
import software.amazon.awssdk.core.internal.http.async.SimpleHttpContentPublisher;
4944
import software.amazon.awssdk.http.SdkHttpFullRequest;
5045
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
51-
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
5246

5347
@Log4j2
5448
@ConnectorExecutor(HTTP)
@@ -72,10 +66,6 @@ public class HttpJsonConnectorExecutor extends AbstractConnectorExecutor {
7266
@Setter
7367
@Getter
7468
private MLGuard mlGuard;
75-
@Setter
76-
private volatile boolean connectorPrivateIpEnabled;
77-
78-
private final AtomicReference<SdkAsyncHttpClient> httpClientRef = new AtomicReference<>();
7969

8070
@Setter
8171
@Getter
@@ -173,19 +163,4 @@ private void validateLLMInterface(String llmInterface) {
173163
throw new IllegalArgumentException(String.format("Unsupported llm interface: %s", llmInterface));
174164
}
175165
}
176-
177-
@VisibleForTesting
178-
protected SdkAsyncHttpClient getHttpClient() {
179-
if (httpClientRef.get() == null) {
180-
Duration connectionTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getConnectionTimeout());
181-
Duration readTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getReadTimeout());
182-
Integer maxConnection = super.getConnectorClientConfig().getMaxConnections();
183-
this.httpClientRef
184-
.compareAndSet(
185-
null,
186-
MLHttpClientFactory.getAsyncHttpClient(connectionTimeout, readTimeout, maxConnection, connectorPrivateIpEnabled)
187-
);
188-
}
189-
return httpClientRef.get();
190-
}
191166
}

0 commit comments

Comments
 (0)