Skip to content

Commit

Permalink
add general context data
Browse files Browse the repository at this point in the history
  • Loading branch information
wzy1935 committed Aug 8, 2024
1 parent 6873522 commit 5268b9c
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 19 deletions.
8 changes: 4 additions & 4 deletions src/main/java/examples/HttpProxyExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ public void proxy(Vertx vertx) {
proxyServer.requestHandler(proxy).listen(8080);
}

private SocketAddress resolveOriginAddress(HttpServerRequest request) {
private SocketAddress resolveOriginAddress(ProxyContext proxyContext) {
return null;
}

public void originSelector(HttpProxy proxy) {
proxy.originSelector(request -> Future.succeededFuture(resolveOriginAddress(request)));
proxy.originSelector(proxyContext -> Future.succeededFuture(resolveOriginAddress(proxyContext)));
}

private RequestOptions resolveOriginOptions(HttpServerRequest request) {
private RequestOptions resolveOriginOptions(ProxyContext proxyContext) {
return null;
}

public void originRequestProvider(HttpProxy proxy) {
proxy.originRequestProvider((request, client) -> client.request(resolveOriginOptions(request)));
proxy.originRequestProvider((proxyContext, client) -> client.request(resolveOriginOptions(proxyContext)));
}

public void inboundInterceptor(HttpProxy proxy) {
Expand Down
46 changes: 41 additions & 5 deletions src/main/java/io/vertx/httpproxy/HttpProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.vertx.core.net.SocketAddress;
import io.vertx.httpproxy.impl.ReverseProxy;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -76,16 +78,40 @@ default HttpProxy origin(int port, String host) {
return origin(SocketAddress.inetSocketAddress(port, host));
}

// /**
// * Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
// *
// * @param selector the selector
// * @return a reference to this, so the API can be used fluently
// */
// @Fluent
// default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
// return originRequestProvider((req, client) -> selector
// .apply(req)
// .flatMap(server -> client.request(new RequestOptions().setServer(server))));
// }
//
// /**
// * Set a provider that creates the request to the <i><b>origin</b></i> server based the incoming HTTP request.
// * Setting a provider overrides any origin selector previously set.
// *
// * @param provider the provider
// * @return a reference to this, so the API can be used fluently
// */
// @GenIgnore()
// @Fluent
// HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);

/**
* Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
*
* @param selector the selector
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
return originRequestProvider((req, client) -> selector
.apply(req)
default HttpProxy originSelector(Function<ProxyContext, Future<SocketAddress>> selector) {
return originRequestProvider((context, client) -> selector
.apply(context)
.flatMap(server -> client.request(new RequestOptions().setServer(server))));
}

Expand All @@ -98,7 +124,7 @@ default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddres
*/
@GenIgnore()
@Fluent
HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);
HttpProxy originRequestProvider(BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> provider);

/**
* Add an interceptor to the interceptor chain.
Expand All @@ -114,6 +140,16 @@ default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddres
*
* @param request the outbound {@code HttpServerRequest}
*/
void handle(HttpServerRequest request);
default void handle(HttpServerRequest request) {
handle(request, new HashMap<>());
}

/**
* Handle the <i><b>outbound</b></i> {@code HttpServerRequest}.
*
* @param request the outbound {@code HttpServerRequest}
* @param attachments the contextual data holder for {@code ProxyContext}. Must be mutable.
*/
void handle(HttpServerRequest request, Map<String, Object> attachments);

}
21 changes: 11 additions & 10 deletions src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ReverseProxy implements HttpProxy {
private final static Logger log = LoggerFactory.getLogger(ReverseProxy.class);
private final HttpClient client;
private final boolean supportWebSocket;
private BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
private BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
private final List<ProxyInterceptor> interceptors = new ArrayList<>();

public ReverseProxy(ProxyOptions options, HttpClient client) {
Expand All @@ -43,7 +43,7 @@ public ReverseProxy(ProxyOptions options, HttpClient client) {
}

@Override
public HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider) {
public HttpProxy originRequestProvider(BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> provider) {
selector = provider;
return this;
}
Expand All @@ -56,7 +56,7 @@ public HttpProxy addInterceptor(ProxyInterceptor interceptor) {


@Override
public void handle(HttpServerRequest request) {
public void handle(HttpServerRequest request, Map<String, Object> attachments) {
ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request);

// Encoding sanity check
Expand All @@ -67,7 +67,7 @@ public void handle(HttpServerRequest request) {
}

boolean isWebSocket = supportWebSocket && request.canUpgradeToWebSocket();
Proxy proxy = new Proxy(proxyRequest, isWebSocket);
Proxy proxy = new Proxy(proxyRequest, isWebSocket, attachments);
proxy.filters = interceptors.listIterator();
proxy.sendRequest()
.recover(throwable -> {
Expand All @@ -91,21 +91,22 @@ private void end(ProxyRequest proxyRequest, int sc) {
.send();
}

private Future<HttpClientRequest> resolveOrigin(HttpServerRequest proxiedRequest) {
return selector.apply(proxiedRequest, client);
private Future<HttpClientRequest> resolveOrigin(ProxyContext context) {
return selector.apply(context, client);
}

private class Proxy implements ProxyContext {

private final ProxyRequest request;
private ProxyResponse response;
private final Map<String, Object> attachments = new HashMap<>();
private final Map<String, Object> attachments;
private ListIterator<ProxyInterceptor> filters;
private final boolean isWebSocket;

private Proxy(ProxyRequest request, boolean isWebSocket) {
private Proxy(ProxyRequest request, boolean isWebSocket, Map<String, Object> attachments) {
this.request = request;
this.isWebSocket = isWebSocket;
this.attachments = attachments;
}

@Override
Expand Down Expand Up @@ -140,7 +141,7 @@ public Future<ProxyResponse> sendRequest() {
} else {
if (isWebSocket) {
HttpServerRequest proxiedRequest = request().proxiedRequest();
return resolveOrigin(proxiedRequest).compose(request -> {
return resolveOrigin(this).compose(request -> {
request.setMethod(request().getMethod());
request.setURI(request().getURI());
request.headers().addAll(request().headers());
Expand Down Expand Up @@ -202,7 +203,7 @@ public Future<Void> sendResponse() {
}

private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
return resolveOrigin(proxyRequest.proxiedRequest()).compose(proxyRequest::send);
return resolveOrigin(this).compose(proxyRequest::send);
}

private Future<Void> sendProxyResponse(ProxyResponse response) {
Expand Down
141 changes: 141 additions & 0 deletions src/test/java/io/vertx/tests/ProxyContextTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.vertx.tests;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.*;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.httpproxy.*;
import org.junit.Test;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class ProxyContextTest extends ProxyTestBase {

private WebSocketClient wsClient;

public ProxyContextTest(ProxyOptions options) {
super(options);
}

@Override
public void tearDown(TestContext context) {
super.tearDown(context);
wsClient = null;
}

// same in TestBase, but allow to attach contexts
private Closeable startProxy(Consumer<HttpProxy> config, Map<String, Object> attachments) {
CompletableFuture<Closeable> res = new CompletableFuture<>();
vertx.deployVerticle(new AbstractVerticle() {
HttpClient proxyClient;
HttpServer proxyServer;
HttpProxy proxy;
@Override
public void start(Promise<Void> startFuture) {
proxyClient = vertx.createHttpClient(new HttpClientOptions(clientOptions));
proxyServer = vertx.createHttpServer(new HttpServerOptions(serverOptions));
proxy = HttpProxy.reverseProxy(proxyOptions, proxyClient);
config.accept(proxy);
proxyServer.requestHandler(request -> {
proxy.handle(request, attachments);
});
proxyServer.listen().onComplete(ar -> startFuture.handle(ar.mapEmpty()));
}
}).onComplete(ar -> {
if (ar.succeeded()) {
String id = ar.result();
res.complete(() -> {
CountDownLatch latch = new CountDownLatch(1);
vertx.undeploy(id).onComplete(ar2 -> latch.countDown());
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
});
} else {
res.completeExceptionally(ar.cause());
}
});
try {
return res.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
} catch (ExecutionException e) {
throw new AssertionError(e.getMessage());
} catch (TimeoutException e) {
throw new AssertionError(e);
}
}

@Test
public void testOriginSelector(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
req.response().end("end");
});
startProxy(proxy -> {
proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
}, new HashMap<>(Map.of("backend", backend)));
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(HttpClientRequest::send)
.onComplete(ctx.asyncAssertSuccess(resp -> {
ctx.assertEquals(resp.statusCode(), 200);
latch.complete();
}));
}

@Test
public void testOriginSelectorWebSocket(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
req.toWebSocket().onSuccess(ws -> {
ws.handler(ws::write);
});
});
startProxy(proxy -> {
proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
}, new HashMap<>(Map.of("backend", backend)));
wsClient = vertx.createWebSocketClient();
wsClient.connect(8080, "localhost", "/")
.onComplete(ctx.asyncAssertSuccess(ws -> {
latch.complete();
}));
}

@Test
public void testInterceptor(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
if (!req.uri().equals("/new-uri")) {
req.response().setStatusCode(404).end();
}
req.response().end("end");
});
startProxy(proxy -> {
proxy.origin(backend)
.addInterceptor(new ProxyInterceptor() {
@Override
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
context.request().setURI(context.get("uri", String.class));
return context.sendRequest();
}
});
}, new HashMap<>(Map.of("uri", "/new-uri")));
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(HttpClientRequest::send)
.onComplete(ctx.asyncAssertSuccess(resp -> {
ctx.assertEquals(resp.statusCode(), 200);
latch.complete();
}));
}
}

0 comments on commit 5268b9c

Please sign in to comment.