Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add topic existing validation #32465

Merged
merged 10 commits into from
Sep 25, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,9 @@ public abstract void modifyAckDeadline(
/** Return a list of topics for {@code project}. */
public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;

/** Return {@literal true} if {@code topic} exists. */
public abstract boolean isTopicExists(TopicPath topic) throws IOException;

/** Create {@code subscription} to {@code topic}. */
public abstract void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand Down Expand Up @@ -372,6 +373,21 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build();
try {
publisherStub().getTopic(request);
return true;
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) {
return false;
}

throw e;
Copy link
Contributor

@Abacn Abacn Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. Pipeline construction environment may not have access to the PubSub. Here it should either be fail safe or not enabled by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Abacn
Oh, I missed that point. How about disabled as default? Checking topic existence is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about disabled as default?

sounds good

}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -861,6 +862,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn) {
Expand All @@ -872,6 +875,7 @@ static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn)
builder.setNeedsOrderingKey(false);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(true);
proost marked this conversation as resolved.
Show resolved Hide resolved
return builder;
}

Expand Down Expand Up @@ -919,6 +923,8 @@ abstract static class Builder<T> {
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Read<T> build();
}

Expand All @@ -944,6 +950,7 @@ public Read<T> fromSubscription(ValueProvider<String> subscription) {
return toBuilder()
.setSubscriptionProvider(
NestedValueProvider.of(subscription, PubsubSubscription::fromPath))
.setValidate(true)
.build();
}

Expand All @@ -967,6 +974,7 @@ public Read<T> fromTopic(ValueProvider<String> topic) {
validateTopic(topic);
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
.setValidate(true)
.build();
}

Expand Down Expand Up @@ -1010,6 +1018,7 @@ public Read<T> withDeadLetterTopic(ValueProvider<String> deadLetterTopic) {
return toBuilder()
.setDeadLetterTopicProvider(
NestedValueProvider.of(deadLetterTopic, PubsubTopic::fromPath))
.setValidate(true)
.build();
}

Expand All @@ -1027,7 +1036,7 @@ private static void validateTopic(ValueProvider<String> topic) {
* PubsubGrpcClientFactory}.
*/
public Read<T> withClientFactory(PubsubClient.PubsubClientFactory factory) {
return toBuilder().setPubsubClientFactory(factory).build();
return toBuilder().setPubsubClientFactory(factory).setValidate(true).build();
}

/**
Expand Down Expand Up @@ -1059,7 +1068,7 @@ public Read<T> withClientFactory(PubsubClient.PubsubClientFactory factory) {
* @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
public Read<T> withTimestampAttribute(String timestampAttribute) {
return toBuilder().setTimestampAttribute(timestampAttribute).build();
return toBuilder().setTimestampAttribute(timestampAttribute).setValidate(true).build();
}

/**
Expand All @@ -1072,7 +1081,7 @@ public Read<T> withTimestampAttribute(String timestampAttribute) {
* delivered, and deduplication of the stream will be strictly best effort.
*/
public Read<T> withIdAttribute(String idAttribute) {
return toBuilder().setIdAttribute(idAttribute).build();
return toBuilder().setIdAttribute(idAttribute).setValidate(true).build();
}

/**
Expand All @@ -1082,7 +1091,7 @@ public Read<T> withIdAttribute(String idAttribute) {
* PCollection#setCoder(Coder)}.
*/
public Read<T> withCoderAndParseFn(Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
return toBuilder().setCoder(coder).setParseFn(parseFn).build();
return toBuilder().setCoder(coder).setParseFn(parseFn).setValidate(true).build();
}

/**
Expand All @@ -1095,9 +1104,15 @@ public Read<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler
return toBuilder()
.setBadRecordErrorHandler(badRecordErrorHandler)
.setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER)
.setValidate(true)
.build();
}

/** Disable validation of the existence of the topic. */
public Read<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}

@VisibleForTesting
/**
* Set's the internal Clock.
Expand Down Expand Up @@ -1258,6 +1273,35 @@ public void process() {
return read.setCoder(getCoder());
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
Expand Down Expand Up @@ -310,6 +311,19 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
try {
pubsub.projects().topics().get(topic.getPath()).execute();
return true;
} catch (GoogleJsonResponseException e) {
if (e.getStatusCode() == 404) {
return false;
}
throw e;
}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,11 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.pubsub.v1.Topic;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
Expand Down Expand Up @@ -432,4 +433,43 @@ public void getSchema(GetSchemaRequest request, StreamObserver<Schema> responseO
server.shutdownNow();
}
}

@Test
public void isTopicExists() throws IOException {
initializeClient(null, null);
TopicPath topicDoesNotExist =
PubsubClient.topicPathFromPath("projects/testProject/topics/dontexist");
TopicPath topicExists = PubsubClient.topicPathFromPath("projects/testProject/topics/exist");

PublisherImplBase publisherImplBase =
new PublisherImplBase() {
@Override
public void getTopic(GetTopicRequest request, StreamObserver<Topic> responseObserver) {
String topicPath = request.getTopic();
if (topicPath.equals(topicDoesNotExist.getPath())) {
responseObserver.onError(
new StatusRuntimeException(Status.fromCode(Status.Code.NOT_FOUND)));
}
if (topicPath.equals(topicExists.getPath())) {
responseObserver.onNext(
Topic.newBuilder()
.setName(topicPath)
.setSchemaSettings(
SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build())
.build());
responseObserver.onCompleted();
}
}
};
Server server =
InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
try {
assertEquals(false, client.isTopicExists(topicDoesNotExist));

assertEquals(true, client.isTopicExists(topicExists));

} finally {
server.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.model.Statement;
import org.mockito.Mockito;

/** Tests for PubsubIO Read and Write transforms. */
@RunWith(JUnit4.class)
Expand Down Expand Up @@ -928,4 +929,72 @@ public void testBigMessageBounded() throws IOException {
pipeline.run();
}
}

@Test
public void testValidate() throws IOException {
PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class);
TopicPath existingTopic = PubsubClient.topicPathFromName("test-project", "testTopic");
PubsubClient mockClient = Mockito.mock(PubsubClient.class);
Mockito.when(mockClient.isTopicExists(existingTopic)).thenReturn(true);
PubsubClient.PubsubClientFactory mockFactory =
Mockito.mock(PubsubClient.PubsubClientFactory.class);
Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient);

Read<PubsubMessage> read =
Read.newBuilder()
.setTopicProvider(
StaticValueProvider.of(
PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/testTopic")))
.setTimestampAttribute("myTimestamp")
.setIdAttribute("myId")
.setPubsubClientFactory(mockFactory)
.setCoder(PubsubMessagePayloadOnlyCoder.of())
.build();

read.validate(options);
}

@Test
public void testValidateTopicIsNotExists() throws Exception {
thrown.expect(IllegalArgumentException.class);

PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class);
TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic");
PubsubClient mockClient = Mockito.mock(PubsubClient.class);
Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false);
PubsubClient.PubsubClientFactory mockFactory =
Mockito.mock(PubsubClient.PubsubClientFactory.class);
Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient);

Read<PubsubMessage> read =
Read.newBuilder()
.setTopicProvider(
StaticValueProvider.of(
PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/nonExistingTopic")))
.setTimestampAttribute("myTimestamp")
.setIdAttribute("myId")
.setPubsubClientFactory(mockFactory)
.setCoder(PubsubMessagePayloadOnlyCoder.of())
.build();

read.validate(options);
}

@Test
public void testWithoutValidation() throws IOException {
PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class);
TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic");
PubsubClient mockClient = Mockito.mock(PubsubClient.class);
Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false);
PubsubClient.PubsubClientFactory mockFactory =
Mockito.mock(PubsubClient.PubsubClientFactory.class);
Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient);

Read<PubsubMessage> read =
PubsubIO.readMessages()
.fromTopic("projects/test-project/topics/nonExistingTopic")
.withoutValidation();

read.validate(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
import com.google.api.services.pubsub.Pubsub.Projects.Topics;
Expand Down Expand Up @@ -308,6 +312,26 @@ private static Topic buildTopic(int i) {
return topic;
}

@Test
public void isTopicExists() throws Exception {
TopicPath topicExists =
PubsubClient.topicPathFromPath("projects/testProject/topics/topicExists");
TopicPath topicDoesNotExist =
PubsubClient.topicPathFromPath("projects/testProject/topics/topicDoesNotExist");
HttpResponseException.Builder builder =
new HttpResponseException.Builder(404, "topic is not found", new HttpHeaders());
GoogleJsonError error = new GoogleJsonError();
when(mockPubsub.projects().topics().get(topicExists.getPath()).execute())
.thenReturn(new Topic().setName(topicExists.getName()));
when(mockPubsub.projects().topics().get(topicDoesNotExist.getPath()).execute())
.thenThrow(new GoogleJsonResponseException(builder, error));

client = new PubsubJsonClient(null, null, mockPubsub);

assertEquals(true, client.isTopicExists(topicExists));
assertEquals(false, client.isTopicExists(topicDoesNotExist));
}

@Test
public void listSubscriptions() throws Exception {
ListSubscriptionsResponse expectedResponse1 = new ListSubscriptionsResponse();
Expand Down
Loading