Skip to content

Hprose 过滤器

小马哥 edited this page Jul 2, 2016 · 9 revisions

简介

有时候,我们可能会希望在远程过程调用中对通讯的一些细节有更多的控制,比如对传输中的数据进行加密、压缩、签名、跟踪、协议转换等等,但是又希望这些工作能够跟服务函数/方法本身可以解耦。这个时候,Hprose 过滤器就是一个不错的选择。

Hprose 过滤器是一个接口,它定义在 hprose.common 包中,它有两个方法,该接口定义如下:

public interface HproseFilter {
    ByteBuffer inputFilter(ByteBuffer data, HproseContext context);
    ByteBuffer outputFilter(ByteBuffer data, HproseContext context);
}

其中 inputFilter 的作用是对输入数据进行处理,outputFilter 的作用是对输出数据进行处理。

data 参数就是输入输出数据。这两个方法的返回值表示已经处理过的数据,如果你不打算对数据进行修改,你可以直接将 data 参数作为返回值返回。

context 参数是调用的上下文对象,我们在服务器和客户端的介绍中已经多次提到过它。

执行顺序

不论是客户端,还是服务器,都可以添加多个过滤器。假设我们按照添加的顺序把它们叫做 filter1, filter2, ... filterN。那么它们的执行顺序是这样的。

在客户端的执行顺序

+------------------- outputFilter -------------------+
| +-------+      +-------+                 +-------+ |
| |filter1|----->|filter2|-----> ... ----->|filterN| |---------+
| +-------+      +-------+                 +-------+ |         v
+----------------------------------------------------+ +---------------+
                                                       | Hprose Server |
+-------------------- inputFilter -------------------+ +---------------+
| +-------+      +-------+                 +-------+ |         |
| |filter1|<-----|filter2|<----- ... <-----|filterN| |<--------+
| +-------+      +-------+                 +-------+ |
+----------------------------------------------------+

在服务器端的执行顺序

                  +-------------------- inputFilter -------------------+
                  | +-------+                 +-------+      +-------+ |
        +-------->| |filterN|-----> ... ----->|filter2|----->|filter1| |
        |         | +-------+                 +-------+      +-------+ |
+---------------+ +----------------------------------------------------+
| Hprose Client |                                                     
+---------------+ +------------------- outputFilter -------------------+
        ^         | +-------+                 +-------+      +-------+ |
        +---------| |filterN|<----- ... <-----|filter2|<-----|filter1| |
                  | +-------+                 +-------+      +-------+ |
                  +----------------------------------------------------+

跟踪调试

有时候我们在调试过程中,可能会需要查看输入输出数据。用抓包工具抓取数据当然是一个办法,但是使用 过滤器可以更方便更直接的显示出输入输出数据。

LogFilter.java

package hprose.example.filter.log;

import hprose.common.HproseContext;
import hprose.common.HproseFilter;
import hprose.util.StrUtil;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;

public class LogFilter implements HproseFilter {
    private static final Logger logger = Logger.getLogger(LogFilter.class.getName());
    @Override
    public ByteBuffer inputFilter(ByteBuffer data, HproseContext context) {
        logger.log(Level.INFO, StrUtil.toString(data));
        return data;
    }
    @Override
    public ByteBuffer outputFilter(ByteBuffer data, HproseContext context) {
        logger.log(Level.INFO, StrUtil.toString(data));
        return data;
    }
}

Server.java

package hprose.example.filter.log;

import hprose.server.HproseTcpServer;
import java.io.IOException;
import java.net.URISyntaxException;

public class Server {
    public static String hello(String name) {
        return "Hello " + name + "!";
    }
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8082");
        server.add("hello", Server.class);
        server.addFilter(new LogFilter());
        server.start();
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

Client.java

package hprose.example.filter.log;

import hprose.client.HproseClient;
import java.io.IOException;
import java.net.URISyntaxException;

interface IHello {
    String hello(String name);
}
public class Client {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseClient client = HproseClient.create("tcp://127.0.0.1:8082");
        client.addFilter(new LogFilter());
        IHello h = client.useService(IHello.class);
        System.out.println(h.hello("World"));
    }
}

然后分别启动服务器和客户端,就会看到如下输出:

服务器输出

START
七月 02, 2016 11:32:19 上午 hprose.example.filter.log.LogFilter inputFilter
信息: Cs5"hello"a1{s5"World"}z
七月 02, 2016 11:32:19 上午 hprose.example.filter.log.LogFilter outputFilter
信息: Rs12"Hello World!"z

客户端输出

七月 02, 2016 11:32:19 上午 hprose.example.filter.log.LogFilter outputFilter
信息: Cs5"hello"a1{s5"World"}z
七月 02, 2016 11:32:19 上午 hprose.example.filter.log.LogFilter inputFilter
信息: Rs12"Hello World!"z
Hello World!

压缩传输

上面的例子,我们只使用了一个过滤器。在本例中,我们展示多个过滤器组合使用的效果。

CompressFilter.java

package hprose.example.filter.compress;

import hprose.common.HproseContext;
import hprose.common.HproseFilter;
import hprose.io.ByteBufferStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class CompressFilter implements HproseFilter {
    private static final Logger logger = Logger.getLogger(CompressFilter.class.getName());
    @Override
    public ByteBuffer inputFilter(ByteBuffer data, HproseContext context) {
        ByteBufferStream is = new ByteBufferStream(data);
        ByteBufferStream os = new ByteBufferStream();
        try {
            GZIPInputStream gis = new GZIPInputStream(is.getInputStream());
            os.readFrom(gis);
        }
        catch (IOException ex) {
            logger.log(Level.SEVERE, null, ex);
        }
        return os.buffer;
    }
    @Override
    public ByteBuffer outputFilter(ByteBuffer data, HproseContext context) {
        ByteBufferStream is = new ByteBufferStream(data);
        ByteBufferStream os = new ByteBufferStream();
        try {
            GZIPOutputStream gos = new GZIPOutputStream(os.getOutputStream());
            is.writeTo(gos);
            gos.finish();
        }
        catch (IOException ex) {
            logger.log(Level.SEVERE, null, ex);
        }
        return os.buffer;
    }
}

SizeFilter.java

package hprose.example.filter.compress;

import hprose.common.HproseContext;
import hprose.common.HproseFilter;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;

public class SizeFilter implements HproseFilter {
    private static final Logger logger = Logger.getLogger(SizeFilter.class.getName());
    private String message = "";
    public SizeFilter(String message) {
        this.message = message;
    }
    @Override
    public ByteBuffer inputFilter(ByteBuffer data, HproseContext context) {
        logger.log(Level.INFO, message + " input size: {0}", data.remaining());
        return data;
    }
    @Override
    public ByteBuffer outputFilter(ByteBuffer data, HproseContext context) {
        logger.log(Level.INFO, message + " output size: {0}", data.remaining());
        return data;
    }
}

Server.java

package hprose.example.filter.compress;

import hprose.server.HproseTcpServer;
import java.io.IOException;
import java.net.URISyntaxException;

public class Server {
    public static Object echo(Object obj) {
        return obj;
    }
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8083");
        server.add("echo", Server.class);
        server.addFilter(new SizeFilter("Non compressed"));
        server.addFilter(new CompressFilter());
        server.addFilter(new SizeFilter("Compressed"));
        server.start();
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

Client.java

package hprose.example.filter.compress;

import hprose.client.HproseClient;
import java.io.IOException;
import java.net.URISyntaxException;

interface IEcho {
    int[] echo(int[] obj);
}
public class Client {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseClient client = HproseClient.create("tcp://127.0.0.1:8083");
        client.addFilter(new SizeFilter("Non compressed"));
        client.addFilter(new CompressFilter());
        client.addFilter(new SizeFilter("Compressed"));
        IEcho h = client.useService(IEcho.class);
        int n = 100000;
        int[] value = new int[n];
        for (int i = 0; i < n; ++i) {
            value[i] = i;
        }
        System.out.println(h.echo(value).length);
    }
}

然后分别启动服务器和客户端,就会看到如下输出:

服务器输出

START
七月 02, 2016 1:30:06 下午 hprose.example.filter.compress.SizeFilter inputFilter
信息: Compressed input size: 213,178
七月 02, 2016 1:30:06 下午 hprose.example.filter.compress.SizeFilter inputFilter
信息: Non compressed input size: 688,893
七月 02, 2016 1:30:06 下午 hprose.example.filter.compress.SizeFilter outputFilter
信息: Non compressed output size: 688,881
七月 02, 2016 1:30:06 下午 hprose.example.filter.compress.SizeFilter outputFilter
信息: Compressed output size: 213,154

客户端输出

七月 02, 2016 1:30:05 下午 hprose.example.filter.compress.SizeFilter outputFilter
信息: Non compressed output size: 688,893
七月 02, 2016 1:30:05 下午 hprose.example.filter.compress.SizeFilter outputFilter
信息: Compressed output size: 213,178
七月 02, 2016 1:30:06 下午 hprose.example.filter.compress.SizeFilter inputFilter
信息: Compressed input size: 213,154
七月 02, 2016 1:30:06 下午 hprose.example.filter.compress.SizeFilter inputFilter
信息: Non compressed input size: 688,881
100000

在这个例子中,我们直接使用了 Java 内置的压缩算法库。读者可以根据自己的需求替换为其它压缩算法。

加密跟这个类似,这里就不再单独举加密的例子了。

运行时间统计

有时候,我们希望能够对调用执行时间做一个统计,对于客户端来说,也就是客户端调用发出前,到客户端收到调用结果的时间统计。对于服务器来说,就是收到客户端调用请求到要发出调用结果的这一段时间的统计。这个功能,通过过滤器也可以实现。

StatFilter.Java

package hprose.example.filter.stat;

import hprose.common.HproseContext;
import hprose.common.HproseFilter;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;

public class StatFilter implements HproseFilter {
    private static final Logger logger = Logger.getLogger(StatFilter.class.getName());
    private void stat(HproseContext context) {
        long now = System.currentTimeMillis();
        long starttime = context.getLong("starttime");
        if (starttime == 0) {
            context.setLong("starttime", now);
        }
        else {
            logger.log(Level.INFO, "It takes {0} ms.", now - starttime);
        }
    }
    @Override
    public ByteBuffer inputFilter(ByteBuffer data, HproseContext context) {
        stat(context);
        return data;
    }
    @Override
    public ByteBuffer outputFilter(ByteBuffer data, HproseContext context) {
        stat(context);
        return data;
    }
}

Server.java

package hprose.example.filter.stat;

import hprose.server.HproseTcpServer;
import java.io.IOException;
import java.net.URISyntaxException;

public class Server {
    public static Object echo(Object obj) {
        return obj;
    }
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8083");
        server.add("echo", Server.class);
        server.addFilter(new StatFilter());
        server.start();
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

Client.java

package hprose.example.filter.stat;

import hprose.client.HproseClient;
import java.io.IOException;
import java.net.URISyntaxException;

interface IEcho {
    int[] echo(int[] obj);
}
public class Client {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseClient client = HproseClient.create("tcp://127.0.0.1:8083");
        client.addFilter(new StatFilter());
        IEcho h = client.useService(IEcho.class);
        int n = 100000;
        int[] value = new int[n];
        for (int i = 0; i < n; ++i) {
            value[i] = i;
        }
        System.out.println(h.echo(value).length);
    }
}

然后分别启动服务器和客户端,就会看到如下输出:

服务器输出

START
七月 02, 2016 2:08:34 下午 hprose.example.filter.stat.StatFilter stat
信息: It takes 118 ms.

客户端输出

七月 02, 2016 2:08:34 下午 hprose.example.filter.stat.StatFilter stat
信息: It takes 203 ms.
100000

在上面这个例子中我们还可以看到如何利用 HproseContext 来进行上下文信息的传递。

最后让我们把这个这个运行时间统计的例子跟上面的压缩例子结合一下,可以看到更详细的时间统计。

Server.java

package hprose.example.filter;

import hprose.example.filter.compress.CompressFilter;
import hprose.example.filter.compress.SizeFilter;
import hprose.example.filter.stat.StatFilter;
import hprose.server.HproseTcpServer;
import java.io.IOException;
import java.net.URISyntaxException;

public class Server {
    public static Object echo(Object obj) {
        return obj;
    }
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8084");
        server.add("echo", Server.class);
        server.addFilter(new StatFilter());
        server.addFilter(new SizeFilter("Non compressed"));
        server.addFilter(new CompressFilter());
        server.addFilter(new SizeFilter("Compressed"));
        server.addFilter(new StatFilter());
        server.start();
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

Client.java

package hprose.example.filter;

import hprose.client.HproseClient;
import hprose.example.filter.compress.CompressFilter;
import hprose.example.filter.compress.SizeFilter;
import hprose.example.filter.stat.StatFilter;
import java.io.IOException;
import java.net.URISyntaxException;

interface IEcho {
    int[] echo(int[] obj);
}
public class Client {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseClient client = HproseClient.create("tcp://127.0.0.1:8084");
        client.addFilter(new StatFilter());
        client.addFilter(new SizeFilter("Non compressed"));
        client.addFilter(new CompressFilter());
        client.addFilter(new SizeFilter("Compressed"));
        client.addFilter(new StatFilter());
        IEcho h = client.useService(IEcho.class);
        int n = 100000;
        int[] value = new int[n];
        for (int i = 0; i < n; ++i) {
            value[i] = i;
        }
        System.out.println(h.echo(value).length);
    }
}

然后分别启动服务器和客户端,就会看到如下输出:

服务器输出

START
七月 02, 2016 2:27:39 下午 hprose.example.filter.compress.SizeFilter inputFilter
信息: Compressed input size: 213,178
七月 02, 2016 2:27:39 下午 hprose.example.filter.compress.SizeFilter inputFilter
信息: Non compressed input size: 688,893
七月 02, 2016 2:27:39 下午 hprose.example.filter.stat.StatFilter stat
信息: It takes 26 ms.
七月 02, 2016 2:27:39 下午 hprose.example.filter.stat.StatFilter stat
信息: It takes 119 ms.
七月 02, 2016 2:27:39 下午 hprose.example.filter.compress.SizeFilter outputFilter
信息: Non compressed output size: 688,881
七月 02, 2016 2:27:39 下午 hprose.example.filter.compress.SizeFilter outputFilter
信息: Compressed output size: 213,154
七月 02, 2016 2:27:39 下午 hprose.example.filter.stat.StatFilter stat
信息: It takes 144 ms.

客户端输出

七月 02, 2016 2:27:39 下午 hprose.example.filter.compress.SizeFilter outputFilter
信息: Non compressed output size: 688,893
七月 02, 2016 2:27:39 下午 hprose.example.filter.compress.SizeFilter outputFilter
信息: Compressed output size: 213,178
七月 02, 2016 2:27:39 下午 hprose.example.filter.stat.StatFilter stat
信息: It takes 42 ms.
七月 02, 2016 2:27:39 下午 hprose.example.filter.stat.StatFilter stat
信息: It takes 251 ms.
七月 02, 2016 2:27:39 下午 hprose.example.filter.compress.SizeFilter inputFilter
信息: Compressed input size: 213,154
七月 02, 2016 2:27:39 下午 hprose.example.filter.compress.SizeFilter inputFilter
信息: Non compressed input size: 688,881
七月 02, 2016 2:27:39 下午 hprose.example.filter.stat.StatFilter stat
信息: It takes 255 ms.
100000

在这里,我们可以看到客户端和服务器端分别输出了三段用时。

服务器端输出:

第一个 26 ms 是解压缩输入数据的时间。

第二个 119 ms 是第一个阶段用时 + 反序列化 + 调用 + 序列化的总时间。

第三个 144 ms 是前两个阶段用时 + 压缩输出数据的时间。

客户端输出:

第一个 42 ms 是压缩输出数据的时间。

第二个 251 ms 是第一个阶段用时 + 从客户端调用发出到服务器端返回数据的总时间。

第三个 255 ms 是前两个阶段用时 + 解压缩输入数据的时间。

看这个结果似乎序列化反序列化用时较长,实际上,如果你不关闭服务器的情况下,如果再运行一遍客户端,你会发现第二阶段用时会大大减少,大概有 10 倍的提升,主要原因是第一次运行时,需要加载很多类和进行初始化造成的。实际序列化和反序列化的用时总和比压缩时间要短很多。

Hprose 过滤器的功能很强大,除了上面这些用法之外,你还可以使用它进行协议转换,比如把 Hprose 服务转换成 JSONRPC 服务等,但是因为 Java 中没有内置 JSON 解析工具,这里就不举例了。另外,结合服务器事件来实现更为复杂的功能。不过这里也不再继续举例说明了。