Thrift框架服务端并发处理模式的java示例

2015-10-16 11:05:07   作者:MangoCool   来源:MangoCool

项目因为需要对外提供不同语言的接口,所以我们采用了高效、跨语言的RPC框架Thrift。因为用的爽!顺理成章继续沿用,但是这次并不是屡试不爽。项目的Thrift服务端没办法并发请求处理,一直都单线程的将非阻塞的客服端请求one by one的处理,如果请求处理时间长的话,就会出现请求高延时的情况。so bad! 不过XXX,终于XXX!以下就是我学习Thrift API之后找到的解决方案示例。

软件:Thrift-0.9.2

依赖:jdk1.7

开发环境:ideaIU-14.1.4

测试环境:win7

Thrift-0.9.2下载地址:http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.2/thrift-0.9.2.exe

建立maven工程ThriftDemo,在pom.xml配置文件添加必要的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ThriftDemo</groupId>
    <artifactId>ThriftDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libthrift</artifactId>
            <version>0.9.2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.12</version>
        </dependency>

    </dependencies>

</project>

根据 Thrift 的语法规范编写脚本文件 UserService.thrift:

namespace java com.mangocool.thrift

struct User
{
    1:string id
    2:string name
    3:i32 sex
}

service UserService
{
    string whatIsName(1:string word)
    User userInfo(1:string id)
}
对于以上脚本和如何生成User.java和UserService.java在这里就不赘述了,之前的一片文章有,地址:一个简单的Thrift框架Java语言示例

实现类UserServiceImpl.java:

package com.mangocool.thrift;

import org.apache.thrift.TException;

/**
 * Created by MANGOCOOL on 2015/9/7.
 */
public class UserServiceImpl implements UserService.Iface
{

    @Override
    public String whatIsName(String word) throws TException {
        String name = "what talking about?";
        System.out.println("what your name?");
        if(!word.isEmpty())
        {
            try {
                for(int i=0; i<30; i++)
                {
                    System.out.println("wo..." + i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            name = "my name is mangocool!";
        }
        System.out.println("接收的参数word: " + word);
        return name;
    }

    @Override
    public User userInfo(String id) throws TException {
        User user = new User();
        if(!id.isEmpty() && id.equals("1023"))
        {
            user.setId("1023");
            user.setName("mangocool");
            user.setSex(1);
        } else
        {
            user.setName("no user!");
        }
        System.out.println("接收的参数id: " + id);
        return user;
    }
}
为方便稍后的并发测试,增加了休眠。

客户端UserServiceClient.java:

package com.mangocool.thrift;

import org.apache.thrift.TException;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.*;

import java.io.IOException;

/**
 * Created by MANGOCOOL on 2015/9/7.
 */
public class UserServiceClient {

    String address = "localhost";
    int port = 7911;
    int timeout = 100*1000;

    public void start()
    {
        //使用非阻塞方式,按块的大小进行传输,类似于Java中的NIO。记得调用close释放资源
        TTransport transport =
                new TFramedTransport(new TSocket(address, port, timeout));
        //高效率的、密集的二进制编码格式进行数据传输协议
        TProtocol protocol = new TCompactProtocol(transport);
        UserService.Client client = new UserService.Client(protocol);
        try {
            open(transport);
            System.out.println(client.whatIsName("hello!"));
            close(transport);
        } catch (TException e) {
            e.printStackTrace();
        }
    }

    public void open(TTransport transport)
    {
        if(transport != null && !transport.isOpen())
        {
            try {
                transport.open();
            } catch (TTransportException e) {
                e.printStackTrace();
            }
        }
    }

    public void close(TTransport transport)
    {
        if(transport != null && transport.isOpen())
        {
            transport.close();
        }
    }

    public static void main(String[] args) {
        UserServiceClient usc = new UserServiceClient();
        usc.start();
    }
}
服务端UserServiceServer.java:
package com.mangocool.thrift;

import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.*;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportException;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by MANGOCOOL on 2015/9/7.
 */
public class UserServiceServer {

    private int servicePort = 7911;

    public void invoke()
    {
        try {
            // 非阻塞式的,配合TFramedTransport使用
            TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(servicePort);
            // 关联处理器与Service服务的实现
            TProcessor processor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
            // 目前Thrift提供的最高级的模式,可并发处理客户端请求
            TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport);
            args.processor(processor);
            // 设置协议工厂,高效率的、密集的二进制编码格式进行数据传输协议
            args.protocolFactory(new TCompactProtocol.Factory());
            // 设置传输工厂,使用非阻塞方式,按块的大小进行传输,类似于Java中的NIO
            args.transportFactory(new TFramedTransport.Factory());
            // 设置处理器工厂,只返回一个单例实例
            args.processorFactory(new TProcessorFactory(processor));
            // 多个线程,主要负责客户端的IO处理
            args.selectorThreads(2);
            // 工作线程池
            ExecutorService pool = Executors.newFixedThreadPool(3);
            args.executorService(pool);
            TThreadedSelectorServer server = new TThreadedSelectorServer(args);
            System.out.println("Starting server on port " + servicePort + "......");
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        UserServiceServer uss = new UserServiceServer();
        uss.invoke();
    }
}
关于ExecutorService线程池的详解,也可以参考文章:Executor线程池详解

测试的时候,先启动服务端,然后启动多个客户端,就可以看到服务端并发处理请求的效果了。


学习文章:http://blog.csdn.net/houjixin/article/details/42779915

Github源码:https://github.com/apache/thrift/blob/master/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java

标签: Thrift Java 服务端 并发 Demo

分享:

上一篇itextpdf库真的也有生成pdf的目录(index)的方法

下一篇RESTful框架jersey的入门学习和简单示例

关于我

一个喜欢唱歌,热衷旅行,爱好电子产品的码农。没事,跟三五好友吼上几嗓子,约上几个背着行囊去露营或者宅在家里抱着孩子敲代码。

座右铭:当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!

            人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。


Copyright 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号

免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。