From dd774921570ccc1d615052c01e17ee1d7d447593 Mon Sep 17 00:00:00 2001 From: proost Date: Mon, 16 Sep 2024 00:09:43 +0900 Subject: [PATCH 1/9] feat: add topic existing validation --- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 3 + .../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 16 +++++ .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 52 ++++++++++++-- .../sdk/io/gcp/pubsub/PubsubJsonClient.java | 14 ++++ .../sdk/io/gcp/pubsub/PubsubTestClient.java | 5 ++ .../io/gcp/pubsub/PubsubGrpcClientTest.java | 40 +++++++++++ .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 69 +++++++++++++++++++ .../io/gcp/pubsub/PubsubJsonClientTest.java | 24 +++++++ 8 files changed, 219 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 2964a29dbb6b6..bd01604643e1a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -507,6 +507,9 @@ public abstract void modifyAckDeadline( /** Return a list of topics for {@code project}. */ public abstract List listTopics(ProjectPath project) throws IOException; + /** Return {@literal true} if {@code topic} exists. */ + public abstract boolean isTopicExists(TopicPath topic) throws IOException; + /** Create {@code subscription} to {@code topic}. */ public abstract void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index 93fdd55240074..0cfb06688108a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -54,6 +54,7 @@ import io.grpc.Channel; import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; import io.grpc.auth.ClientAuthInterceptor; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; @@ -372,6 +373,21 @@ public List listTopics(ProjectPath project) throws IOException { return topics; } + @Override + public boolean isTopicExists(TopicPath topic) throws IOException { + GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build(); + try { + publisherStub().getTopic(request); + return true; + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) { + return false; + } + + throw e; + } + } + @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index b561b4711d521..0d96dcb9b6bc0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -861,6 +862,8 @@ public abstract static class Read extends PTransform> abstract ErrorHandler getBadRecordErrorHandler(); + abstract boolean getValidate(); + abstract Builder toBuilder(); static Builder newBuilder(SerializableFunction parseFn) { @@ -872,6 +875,7 @@ static Builder newBuilder(SerializableFunction parseFn) builder.setNeedsOrderingKey(false); builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); + builder.setValidate(true); return builder; } @@ -919,6 +923,8 @@ abstract static class Builder { abstract Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); + abstract Builder setValidate(boolean validation); + abstract Read build(); } @@ -944,6 +950,7 @@ public Read fromSubscription(ValueProvider subscription) { return toBuilder() .setSubscriptionProvider( NestedValueProvider.of(subscription, PubsubSubscription::fromPath)) + .setValidate(true) .build(); } @@ -967,6 +974,7 @@ public Read fromTopic(ValueProvider topic) { validateTopic(topic); return toBuilder() .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) + .setValidate(true) .build(); } @@ -1010,6 +1018,7 @@ public Read withDeadLetterTopic(ValueProvider deadLetterTopic) { return toBuilder() .setDeadLetterTopicProvider( NestedValueProvider.of(deadLetterTopic, PubsubTopic::fromPath)) + .setValidate(true) .build(); } @@ -1027,7 +1036,7 @@ private static void validateTopic(ValueProvider topic) { * PubsubGrpcClientFactory}. */ public Read withClientFactory(PubsubClient.PubsubClientFactory factory) { - return toBuilder().setPubsubClientFactory(factory).build(); + return toBuilder().setPubsubClientFactory(factory).setValidate(true).build(); } /** @@ -1059,7 +1068,7 @@ public Read withClientFactory(PubsubClient.PubsubClientFactory factory) { * @see RFC 3339 */ public Read withTimestampAttribute(String timestampAttribute) { - return toBuilder().setTimestampAttribute(timestampAttribute).build(); + return toBuilder().setTimestampAttribute(timestampAttribute).setValidate(true).build(); } /** @@ -1072,7 +1081,7 @@ public Read withTimestampAttribute(String timestampAttribute) { * delivered, and deduplication of the stream will be strictly best effort. */ public Read withIdAttribute(String idAttribute) { - return toBuilder().setIdAttribute(idAttribute).build(); + return toBuilder().setIdAttribute(idAttribute).setValidate(true).build(); } /** @@ -1082,7 +1091,7 @@ public Read withIdAttribute(String idAttribute) { * PCollection#setCoder(Coder)}. */ public Read withCoderAndParseFn(Coder coder, SimpleFunction parseFn) { - return toBuilder().setCoder(coder).setParseFn(parseFn).build(); + return toBuilder().setCoder(coder).setParseFn(parseFn).setValidate(true).build(); } /** @@ -1095,9 +1104,15 @@ public Read withErrorHandler(ErrorHandler badRecordErrorHandler return toBuilder() .setBadRecordErrorHandler(badRecordErrorHandler) .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) + .setValidate(true) .build(); } + /** Disable validation of the existence of the topic. */ + public Read withoutValidation() { + return toBuilder().setValidate(false).build(); + } + @VisibleForTesting /** * Set's the internal Clock. @@ -1258,6 +1273,35 @@ public void process() { return read.setCoder(getCoder()); } + @Override + public void validate(PipelineOptions options) { + if (!getValidate()) { + return; + } + + PubsubOptions psOptions = options.as(PubsubOptions.class); + + // Validate the existence of the topic. + if (getTopicProvider() != null) { + PubsubTopic topic = getTopicProvider().get(); + boolean topicExists = true; + try (PubsubClient pubsubClient = + getPubsubClientFactory() + .newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) { + topicExists = + pubsubClient.isTopicExists( + PubsubClient.topicPathFromName(topic.project, topic.topic)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (!topicExists) { + throw new IllegalArgumentException( + String.format("Pubsub topic '%s' does not exist.", topic)); + } + } + } + @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 386febcf005ba..0a838da66f696 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions; @@ -310,6 +311,19 @@ public List listTopics(ProjectPath project) throws IOException { return topics; } + @Override + public boolean isTopicExists(TopicPath topic) throws IOException { + try { + pubsub.projects().topics().get(topic.getPath()).execute(); + return true; + } catch (GoogleJsonResponseException e) { + if (e.getStatusCode() == 404) { + return false; + } + throw e; + } + } + @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index a8109d05ec380..7ac524eda608c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -605,6 +605,11 @@ public List listTopics(ProjectPath project) throws IOException { throw new UnsupportedOperationException(); } + @Override + public boolean isTopicExists(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index 3724e169c6122..6c4625f2e0779 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -40,6 +40,7 @@ import com.google.pubsub.v1.Topic; import io.grpc.ManagedChannel; import io.grpc.Server; +import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -432,4 +433,43 @@ public void getSchema(GetSchemaRequest request, StreamObserver responseO server.shutdownNow(); } } + + @Test + public void isTopicExists() throws IOException { + initializeClient(null, null); + TopicPath topicDoesNotExist = + PubsubClient.topicPathFromPath("projects/testProject/topics/dontexist"); + TopicPath topicExists = PubsubClient.topicPathFromPath("projects/testProject/topics/exist"); + + PublisherImplBase publisherImplBase = + new PublisherImplBase() { + @Override + public void getTopic(GetTopicRequest request, StreamObserver responseObserver) { + String topicPath = request.getTopic(); + if (topicPath.equals(topicDoesNotExist.getPath())) { + responseObserver.onError( + new StatusRuntimeException(Status.fromCode(Status.Code.NOT_FOUND))); + } + if (topicPath.equals(topicExists.getPath())) { + responseObserver.onNext( + Topic.newBuilder() + .setName(topicPath) + .setSchemaSettings( + SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build()) + .build()); + responseObserver.onCompleted(); + } + } + }; + Server server = + InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start(); + try { + assertEquals(false, client.isTopicExists(topicDoesNotExist)); + + assertEquals(true, client.isTopicExists(topicExists)); + + } finally { + server.shutdownNow(); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index d4effbae40a4c..baeedff70365e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -97,6 +97,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.junit.runners.model.Statement; +import org.mockito.Mockito; /** Tests for PubsubIO Read and Write transforms. */ @RunWith(JUnit4.class) @@ -928,4 +929,72 @@ public void testBigMessageBounded() throws IOException { pipeline.run(); } } + + @Test + public void testValidate() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath existingTopic = PubsubClient.topicPathFromName("test-project", "testTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(existingTopic)).thenReturn(true); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + Read read = + Read.newBuilder() + .setTopicProvider( + StaticValueProvider.of( + PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/testTopic"))) + .setTimestampAttribute("myTimestamp") + .setIdAttribute("myId") + .setPubsubClientFactory(mockFactory) + .setCoder(PubsubMessagePayloadOnlyCoder.of()) + .build(); + + read.validate(options); + } + + @Test + public void testValidateTopicIsNotExists() throws Exception { + thrown.expect(IllegalArgumentException.class); + + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + Read read = + Read.newBuilder() + .setTopicProvider( + StaticValueProvider.of( + PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/nonExistingTopic"))) + .setTimestampAttribute("myTimestamp") + .setIdAttribute("myId") + .setPubsubClientFactory(mockFactory) + .setCoder(PubsubMessagePayloadOnlyCoder.of()) + .build(); + + read.validate(options); + } + + @Test + public void testWithoutValidation() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + Read read = + PubsubIO.readMessages() + .fromTopic("projects/test-project/topics/nonExistingTopic") + .withoutValidation(); + + read.validate(options); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index 634ad42c937ae..5ee32825db1fb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -23,6 +23,10 @@ import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; +import com.google.api.client.googleapis.json.GoogleJsonError; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponseException; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions; import com.google.api.services.pubsub.Pubsub.Projects.Topics; @@ -308,6 +312,26 @@ private static Topic buildTopic(int i) { return topic; } + @Test + public void isTopicExists() throws Exception { + TopicPath topicExists = + PubsubClient.topicPathFromPath("projects/testProject/topics/topicExists"); + TopicPath topicDoesNotExist = + PubsubClient.topicPathFromPath("projects/testProject/topics/topicDoesNotExist"); + HttpResponseException.Builder builder = + new HttpResponseException.Builder(404, "topic is not found", new HttpHeaders()); + GoogleJsonError error = new GoogleJsonError(); + when(mockPubsub.projects().topics().get(topicExists.getPath()).execute()) + .thenReturn(new Topic().setName(topicExists.getName())); + when(mockPubsub.projects().topics().get(topicDoesNotExist.getPath()).execute()) + .thenThrow(new GoogleJsonResponseException(builder, error)); + + client = new PubsubJsonClient(null, null, mockPubsub); + + assertEquals(true, client.isTopicExists(topicExists)); + assertEquals(false, client.isTopicExists(topicDoesNotExist)); + } + @Test public void listSubscriptions() throws Exception { ListSubscriptionsResponse expectedResponse1 = new ListSubscriptionsResponse(); From 171f1ee84b3ff5972d9ec84cac5aaead0e6c92c1 Mon Sep 17 00:00:00 2001 From: proost Date: Mon, 16 Sep 2024 12:25:40 +0900 Subject: [PATCH 2/9] feat: add validation to write --- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 69 +++++++++-- .../sdk/io/gcp/pubsub/PubsubTestClient.java | 3 +- .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 116 +++++++++++++++++- 3 files changed, 177 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 0d96dcb9b6bc0..b5869a54dc669 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1381,6 +1381,8 @@ public abstract static class Write extends PTransform, PDone> abstract ErrorHandler getBadRecordErrorHandler(); + abstract boolean getValidate(); + abstract Builder toBuilder(); static Builder newBuilder( @@ -1390,6 +1392,7 @@ static Builder newBuilder( builder.setFormatFn(formatFn); builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); + builder.setValidate(true); return builder; } @@ -1426,6 +1429,8 @@ abstract Builder setFormatFn( abstract Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); + abstract Builder setValidate(boolean validation); + abstract Write build(); } @@ -1436,15 +1441,19 @@ abstract Builder setBadRecordErrorHandler( * {@code topic} string. */ public Write to(String topic) { - return to(StaticValueProvider.of(topic)); + ValueProvider topicProvider = StaticValueProvider.of(topic); + validateTopic(topicProvider); + return to(topicProvider); } /** Like {@code topic()} but with a {@link ValueProvider}. */ public Write to(ValueProvider topic) { + validateTopic(topic); return toBuilder() .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) .setTopicFunction(null) .setDynamicDestinations(false) + .setValidate(true) .build(); } @@ -1458,9 +1467,17 @@ public Write to(SerializableFunction, String> topicFun .setTopicProvider(null) .setTopicFunction(v -> PubsubTopic.fromPath(topicFunction.apply(v))) .setDynamicDestinations(true) + .setValidate(true) .build(); } + /** Handles validation of {@code topic}. */ + private static void validateTopic(ValueProvider topic) { + if (topic.isAccessible()) { + PubsubTopic.fromPath(topic.get()); + } + } + /** * The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link * PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client @@ -1468,7 +1485,7 @@ public Write to(SerializableFunction, String> topicFun * PubsubGrpcClientFactory}. */ public Write withClientFactory(PubsubClient.PubsubClientFactory factory) { - return toBuilder().setPubsubClientFactory(factory).build(); + return toBuilder().setPubsubClientFactory(factory).setValidate(true).build(); } /** @@ -1483,7 +1500,7 @@ public Write withClientFactory(PubsubClient.PubsubClientFactory factory) { * hit. */ public Write withMaxBatchSize(int batchSize) { - return toBuilder().setMaxBatchSize(batchSize).build(); + return toBuilder().setMaxBatchSize(batchSize).setValidate(true).build(); } /** @@ -1491,7 +1508,7 @@ public Write withMaxBatchSize(int batchSize) { * bytes to be sent to Pub/Sub in a single batched message. */ public Write withMaxBatchBytesSize(int maxBatchBytesSize) { - return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build(); + return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).setValidate(true).build(); } /** @@ -1505,7 +1522,7 @@ public Write withMaxBatchBytesSize(int maxBatchBytesSize) { * these timestamps from the appropriate attribute. */ public Write withTimestampAttribute(String timestampAttribute) { - return toBuilder().setTimestampAttribute(timestampAttribute).build(); + return toBuilder().setTimestampAttribute(timestampAttribute).setValidate(true).build(); } /** @@ -1517,11 +1534,11 @@ public Write withTimestampAttribute(String timestampAttribute) { * these unique identifiers from the appropriate attribute. */ public Write withIdAttribute(String idAttribute) { - return toBuilder().setIdAttribute(idAttribute).build(); + return toBuilder().setIdAttribute(idAttribute).setValidate(true).build(); } public Write withPubsubRootUrl(String pubsubRootUrl) { - return toBuilder().setPubsubRootUrl(pubsubRootUrl).build(); + return toBuilder().setPubsubRootUrl(pubsubRootUrl).setValidate(true).build(); } /** @@ -1534,9 +1551,18 @@ public Write withErrorHandler(ErrorHandler badRecordErrorHandle return toBuilder() .setBadRecordErrorHandler(badRecordErrorHandler) .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) + .setValidate(true) .build(); } + /** + * Disable validation of the existence of the topic. Validation of the topic works only if the + * topic is set statically and not dynamically. + */ + public Write withoutValidation() { + return toBuilder().setValidate(false).build(); + } + @Override public PDone expand(PCollection input) { if (getTopicProvider() == null && !getDynamicDestinations()) { @@ -1613,6 +1639,35 @@ public void populateDisplayData(DisplayData.Builder builder) { builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider()); } + @Override + public void validate(PipelineOptions options) { + if (!getValidate()) { + return; + } + + PubsubOptions psOptions = options.as(PubsubOptions.class); + + // Validate the existence of the topic. + if (getTopicProvider() != null) { + PubsubTopic topic = getTopicProvider().get(); + boolean topicExists = true; + try (PubsubClient pubsubClient = + getPubsubClientFactory() + .newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) { + topicExists = + pubsubClient.isTopicExists( + PubsubClient.topicPathFromName(topic.project, topic.topic)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (!topicExists) { + throw new IllegalArgumentException( + String.format("Pubsub topic '%s' does not exist.", topic)); + } + } + } + /** * Writer to Pubsub which batches messages from bounded collections. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index 7ac524eda608c..3d5a879fce150 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -607,7 +607,8 @@ public List listTopics(ProjectPath project) throws IOException { @Override public boolean isTopicExists(TopicPath topic) throws IOException { - throw new UnsupportedOperationException(); + // Always return true for testing purposes. + return true; } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index baeedff70365e..a3398308658e4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -931,7 +931,7 @@ public void testBigMessageBounded() throws IOException { } @Test - public void testValidate() throws IOException { + public void testReadValidate() throws IOException { PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); TopicPath existingTopic = PubsubClient.topicPathFromName("test-project", "testTopic"); PubsubClient mockClient = Mockito.mock(PubsubClient.class); @@ -955,7 +955,7 @@ public void testValidate() throws IOException { } @Test - public void testValidateTopicIsNotExists() throws Exception { + public void testReadValidateTopicIsNotExists() throws Exception { thrown.expect(IllegalArgumentException.class); PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); @@ -981,7 +981,7 @@ public void testValidateTopicIsNotExists() throws Exception { } @Test - public void testWithoutValidation() throws IOException { + public void testReadWithoutValidation() throws IOException { PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); PubsubClient mockClient = Mockito.mock(PubsubClient.class); @@ -997,4 +997,114 @@ public void testWithoutValidation() throws IOException { read.validate(options); } + + @Test + public void testWriteTopicValidationSuccess() throws Exception { + PubsubIO.writeStrings().to("projects/my-project/topics/abc"); + PubsubIO.writeStrings().to("projects/my-project/topics/ABC"); + PubsubIO.writeStrings().to("projects/my-project/topics/AbC-DeF"); + PubsubIO.writeStrings().to("projects/my-project/topics/AbC-1234"); + PubsubIO.writeStrings().to("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc"); + PubsubIO.writeStrings() + .to( + new StringBuilder() + .append("projects/my-project/topics/A-really-long-one-") + .append( + "111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append( + "111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append( + "11111111111111111111111111111111111111111111111111111111111111111111111111") + .toString()); + } + + @Test + public void testWriteTopicValidationBadCharacter() throws Exception { + thrown.expect(IllegalArgumentException.class); + PubsubIO.writeStrings().to("projects/my-project/topics/abc-*-abc"); + } + + @Test + public void testWriteValidationTooLong() throws Exception { + thrown.expect(IllegalArgumentException.class); + PubsubIO.writeStrings() + .to( + new StringBuilder() + .append("projects/my-project/topics/A-really-long-one-") + .append( + "111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append( + "111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append( + "1111111111111111111111111111111111111111111111111111111111111111111111111111") + .toString()); + } + + @Test + public void testWriteValidate() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath existingTopic = PubsubClient.topicPathFromName("test-project", "testTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(existingTopic)).thenReturn(true); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + PubsubIO.Write write = + PubsubIO.Write.newBuilder() + .setTopicProvider( + StaticValueProvider.of( + PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/testTopic"))) + .setTimestampAttribute("myTimestamp") + .setIdAttribute("myId") + .setDynamicDestinations(false) + .setPubsubClientFactory(mockFactory) + .build(); + + write.validate(options); + } + + @Test + public void testWriteValidateTopicIsNotExists() throws Exception { + thrown.expect(IllegalArgumentException.class); + + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + PubsubIO.Write write = + PubsubIO.Write.newBuilder() + .setTopicProvider( + StaticValueProvider.of( + PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/nonExistingTopic"))) + .setTimestampAttribute("myTimestamp") + .setIdAttribute("myId") + .setDynamicDestinations(false) + .setPubsubClientFactory(mockFactory) + .build(); + + write.validate(options); + } + + @Test + public void testWithoutValidation() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + PubsubIO.Write write = + PubsubIO.writeMessages() + .to("projects/test-project/topics/nonExistingTopic") + .withoutValidation(); + + write.validate(options); + } } From 31d69ea6fcdf0e961c0a0e78379455db5eab292d Mon Sep 17 00:00:00 2001 From: proost Date: Mon, 16 Sep 2024 12:29:32 +0900 Subject: [PATCH 3/9] docs: add changes --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index c194cf730785c..68b3cf502abd3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,7 +62,7 @@ ## I/Os -* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Check Pub/Sub topic is existing before Read/Write (Java) ([#32465](https://github.com/apache/beam/pull/32465)) ## New Features / Improvements From c7e89b93f75016920b9b6f87730f3fb7d8b42327 Mon Sep 17 00:00:00 2001 From: proost Date: Mon, 16 Sep 2024 21:06:38 +0900 Subject: [PATCH 4/9] docs: change docs --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 68b3cf502abd3..ec17751e5e2bd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,7 +62,7 @@ ## I/Os -* Check Pub/Sub topic is existing before Read/Write (Java) ([#32465](https://github.com/apache/beam/pull/32465)) +* PubsubIO will validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465)) ## New Features / Improvements From 24859f869d28d9ae3efbcbf803c249981725725a Mon Sep 17 00:00:00 2001 From: proost Date: Mon, 16 Sep 2024 21:33:22 +0900 Subject: [PATCH 5/9] refactor: change validate to primitive type --- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index b5869a54dc669..79dead8cfcda4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -950,7 +950,6 @@ public Read fromSubscription(ValueProvider subscription) { return toBuilder() .setSubscriptionProvider( NestedValueProvider.of(subscription, PubsubSubscription::fromPath)) - .setValidate(true) .build(); } @@ -974,7 +973,6 @@ public Read fromTopic(ValueProvider topic) { validateTopic(topic); return toBuilder() .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) - .setValidate(true) .build(); } @@ -1018,7 +1016,6 @@ public Read withDeadLetterTopic(ValueProvider deadLetterTopic) { return toBuilder() .setDeadLetterTopicProvider( NestedValueProvider.of(deadLetterTopic, PubsubTopic::fromPath)) - .setValidate(true) .build(); } @@ -1036,7 +1033,7 @@ private static void validateTopic(ValueProvider topic) { * PubsubGrpcClientFactory}. */ public Read withClientFactory(PubsubClient.PubsubClientFactory factory) { - return toBuilder().setPubsubClientFactory(factory).setValidate(true).build(); + return toBuilder().setPubsubClientFactory(factory).build(); } /** @@ -1068,7 +1065,7 @@ public Read withClientFactory(PubsubClient.PubsubClientFactory factory) { * @see RFC 3339 */ public Read withTimestampAttribute(String timestampAttribute) { - return toBuilder().setTimestampAttribute(timestampAttribute).setValidate(true).build(); + return toBuilder().setTimestampAttribute(timestampAttribute).build(); } /** @@ -1081,7 +1078,7 @@ public Read withTimestampAttribute(String timestampAttribute) { * delivered, and deduplication of the stream will be strictly best effort. */ public Read withIdAttribute(String idAttribute) { - return toBuilder().setIdAttribute(idAttribute).setValidate(true).build(); + return toBuilder().setIdAttribute(idAttribute).build(); } /** @@ -1091,7 +1088,7 @@ public Read withIdAttribute(String idAttribute) { * PCollection#setCoder(Coder)}. */ public Read withCoderAndParseFn(Coder coder, SimpleFunction parseFn) { - return toBuilder().setCoder(coder).setParseFn(parseFn).setValidate(true).build(); + return toBuilder().setCoder(coder).setParseFn(parseFn).build(); } /** @@ -1104,7 +1101,6 @@ public Read withErrorHandler(ErrorHandler badRecordErrorHandler return toBuilder() .setBadRecordErrorHandler(badRecordErrorHandler) .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) - .setValidate(true) .build(); } @@ -1453,7 +1449,6 @@ public Write to(ValueProvider topic) { .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) .setTopicFunction(null) .setDynamicDestinations(false) - .setValidate(true) .build(); } @@ -1467,7 +1462,6 @@ public Write to(SerializableFunction, String> topicFun .setTopicProvider(null) .setTopicFunction(v -> PubsubTopic.fromPath(topicFunction.apply(v))) .setDynamicDestinations(true) - .setValidate(true) .build(); } @@ -1485,7 +1479,7 @@ private static void validateTopic(ValueProvider topic) { * PubsubGrpcClientFactory}. */ public Write withClientFactory(PubsubClient.PubsubClientFactory factory) { - return toBuilder().setPubsubClientFactory(factory).setValidate(true).build(); + return toBuilder().setPubsubClientFactory(factory).build(); } /** @@ -1500,7 +1494,7 @@ public Write withClientFactory(PubsubClient.PubsubClientFactory factory) { * hit. */ public Write withMaxBatchSize(int batchSize) { - return toBuilder().setMaxBatchSize(batchSize).setValidate(true).build(); + return toBuilder().setMaxBatchSize(batchSize).build(); } /** @@ -1508,7 +1502,7 @@ public Write withMaxBatchSize(int batchSize) { * bytes to be sent to Pub/Sub in a single batched message. */ public Write withMaxBatchBytesSize(int maxBatchBytesSize) { - return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).setValidate(true).build(); + return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build(); } /** @@ -1522,7 +1516,7 @@ public Write withMaxBatchBytesSize(int maxBatchBytesSize) { * these timestamps from the appropriate attribute. */ public Write withTimestampAttribute(String timestampAttribute) { - return toBuilder().setTimestampAttribute(timestampAttribute).setValidate(true).build(); + return toBuilder().setTimestampAttribute(timestampAttribute).build(); } /** @@ -1534,11 +1528,11 @@ public Write withTimestampAttribute(String timestampAttribute) { * these unique identifiers from the appropriate attribute. */ public Write withIdAttribute(String idAttribute) { - return toBuilder().setIdAttribute(idAttribute).setValidate(true).build(); + return toBuilder().setIdAttribute(idAttribute).build(); } public Write withPubsubRootUrl(String pubsubRootUrl) { - return toBuilder().setPubsubRootUrl(pubsubRootUrl).setValidate(true).build(); + return toBuilder().setPubsubRootUrl(pubsubRootUrl).build(); } /** @@ -1551,7 +1545,6 @@ public Write withErrorHandler(ErrorHandler badRecordErrorHandle return toBuilder() .setBadRecordErrorHandler(badRecordErrorHandler) .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) - .setValidate(true) .build(); } From a09e32e504cfd6db6c3d4e7d4447d6bfe9955216 Mon Sep 17 00:00:00 2001 From: proost Date: Mon, 16 Sep 2024 21:39:11 +0900 Subject: [PATCH 6/9] test: change test more clearly --- .../apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index a3398308658e4..0fe3c4f150ae5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -83,6 +83,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.commons.lang3.RandomStringUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -1010,11 +1011,7 @@ public void testWriteTopicValidationSuccess() throws Exception { new StringBuilder() .append("projects/my-project/topics/A-really-long-one-") .append( - "111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append( - "111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append( - "11111111111111111111111111111111111111111111111111111111111111111111111111") + RandomStringUtils.randomAlphanumeric(100)) .toString()); } @@ -1032,11 +1029,7 @@ public void testWriteValidationTooLong() throws Exception { new StringBuilder() .append("projects/my-project/topics/A-really-long-one-") .append( - "111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append( - "111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append( - "1111111111111111111111111111111111111111111111111111111111111111111111111111") + RandomStringUtils.randomAlphanumeric(1000)) .toString()); } From 14bc75606337d434542dcb7a2d6c18317cb69214 Mon Sep 17 00:00:00 2001 From: proost Date: Mon, 16 Sep 2024 21:46:05 +0900 Subject: [PATCH 7/9] style: follow lint --- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 0fe3c4f150ae5..0f4c929619a5a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -1010,8 +1010,7 @@ public void testWriteTopicValidationSuccess() throws Exception { .to( new StringBuilder() .append("projects/my-project/topics/A-really-long-one-") - .append( - RandomStringUtils.randomAlphanumeric(100)) + .append(RandomStringUtils.randomAlphanumeric(100)) .toString()); } @@ -1028,8 +1027,7 @@ public void testWriteValidationTooLong() throws Exception { .to( new StringBuilder() .append("projects/my-project/topics/A-really-long-one-") - .append( - RandomStringUtils.randomAlphanumeric(1000)) + .append(RandomStringUtils.randomAlphanumeric(1000)) .toString()); } From 4ff4b549238204362640d1427da09f5fb0b777df Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 25 Sep 2024 23:53:19 +0900 Subject: [PATCH 8/9] docs: fix CHANGES --- CHANGES.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6469d68e3c86c..88a1319fb5a06 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -57,9 +57,6 @@ ## Highlights -* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). -* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). - ## I/Os * Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528)) From a298d50195fe8e8375fd79872b6dc1969819b852 Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 25 Sep 2024 23:54:58 +0900 Subject: [PATCH 9/9] docs: follow changes --- CHANGES.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 88a1319fb5a06..a0133bd531ca3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -57,9 +57,10 @@ ## Highlights +* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528)) + ## I/Os -* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528)) * PubsubIO will validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465)) ## New Features / Improvements