某山寨

z4zr的待调教小窝

用netty实现http下载 支持大文件

用netty实现http下载 支持大文件

Netty是一个java开源框架,提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序,是一个基于NIO的客户,服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,Netty 吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

可以自己定义基于TCP、UDP的各种通信协议0.0
Click To Netty官网

下面的例子是用netty实现的http文件下载服务端和客户端,应用netty4.0实现。通过实例化直接可以使用。
sever端是精简后的官方的例子,原版Link-Github

package netty;

import handler.HttpChannelInitlalizer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class HttpFileListenServerBootstarp implements Runnable {
  private int port;

  public HttpFileListenServerBootstarp(int port) {
    super();
    this.port = port;
  }

  @Override
  public void run() {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup);
    serverBootstrap.channel(NioServerSocketChannel.class);
    //serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
    serverBootstrap.childHandler(new HttpChannelInitlalizer());
    try {
      ChannelFuture f = serverBootstrap.bind(port).sync();
      f.channel().closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
}
package handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

public class HttpChannelInitlalizer extends ChannelInitializer<SocketChannel> {

  @Override
  protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    pipeline.addLast(new ChunkedWriteHandler());
    pipeline.addLast(new HttpChannelHandler());
  }

}
package handler;
 
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; 
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; 
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; 
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; 
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; 
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; 
import io.netty.buffer.Unpooled; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelProgressiveFuture; 
import io.netty.channel.ChannelProgressiveFutureListener; 
import io.netty.channel.SimpleChannelInboundHandler; 
import io.netty.handler.codec.http.DefaultFullHttpResponse; 
import io.netty.handler.codec.http.DefaultHttpResponse; 
import io.netty.handler.codec.http.FullHttpRequest; 
import io.netty.handler.codec.http.FullHttpResponse; 
import io.netty.handler.codec.http.HttpChunkedInput; 
import io.netty.handler.codec.http.HttpHeaders; 
import io.netty.handler.codec.http.HttpResponse; 
import io.netty.handler.codec.http.HttpResponseStatus; 
import io.netty.handler.codec.http.HttpVersion; 
import io.netty.handler.codec.http.LastHttpContent; 
import io.netty.handler.stream.ChunkedFile; 
import io.netty.util.CharsetUtil; 
import io.netty.util.internal.SystemPropertyUtil; 
   
import java.io.File; 
import java.io.FileNotFoundException; 
import java.io.RandomAccessFile; 
import java.io.UnsupportedEncodingException; 
import java.net.URLDecoder; 
import java.util.regex.Pattern; 
   
import javax.activation.MimetypesFileTypeMap; 
   
public class HttpChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> { 
    public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; 
    public static final String HTTP_DATE_GMT_TIMEZONE = "GMT"; 
    public static final int HTTP_CACHE_SECONDS = 60; 
   
    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { 
        // 监测解码情况 
        if (!request.getDecoderResult().isSuccess()) { 
            sendError(ctx, BAD_REQUEST); 
            return; 
        } 
        final String uri = request.getUri(); 
        final String path = sanitizeUri(uri); 
        System.out.println("get file:"+path);
        if (path == null) {
            sendError(ctx, FORBIDDEN);
            return;
        }
        //读取要下载的文件
        File file = new File(path);
        if (file.isHidden() || !file.exists()) { 
            sendError(ctx, NOT_FOUND); 
            return; 
        } 
        if (!file.isFile()) { 
            sendError(ctx, FORBIDDEN); 
            return; 
        }
        RandomAccessFile raf; 
        try { 
            raf = new RandomAccessFile(file, "r"); 
        } catch (FileNotFoundException ignore) { 
            sendError(ctx, NOT_FOUND); 
            return; 
        } 
        long fileLength = raf.length(); 
        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        HttpHeaders.setContentLength(response, fileLength); 
        setContentTypeHeader(response, file); 
        //setDateAndCacheHeaders(response, file); 
        if (HttpHeaders.isKeepAlive(request)) { 
            response.headers().set("CONNECTION", HttpHeaders.Values.KEEP_ALIVE); 
        } 
   
        // Write the initial line and the header. 
        ctx.write(response);
   
        // Write the content. 
        //ChannelFuture sendFileFuture = 
        ctx.write(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), ctx.newProgressivePromise()); 
//        sendFuture用于监视发送数据的状态
//        sendFileFuture.addListener(new ChannelProgressiveFutureListener() { 
//            @Override 
//            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) { 
//                if (total < 0) { // total unknown 
//                    System.err.println(future.channel() + " Transfer progress: " + progress); 
//                } else { 
//                    System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total); 
//                } 
//            } 
//   
//            @Override 
//            public void operationComplete(ChannelProgressiveFuture future) { 
//                System.err.println(future.channel() + " Transfer complete."); 
//            } 
//        }); 
   
        // Write the end marker 
        ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); 
   
        // Decide whether to close the connection or not. 
        if (!HttpHeaders.isKeepAlive(request)) { 
            // Close the connection when the whole content is written out. 
            lastContentFuture.addListener(ChannelFutureListener.CLOSE); 
        } 
    }
   
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
        cause.printStackTrace(); 
        if (ctx.channel().isActive()) { 
            sendError(ctx, INTERNAL_SERVER_ERROR); 
        } 
        ctx.close();
    } 
   
    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*"); 
   
    private static String sanitizeUri(String uri) { 
        // Decode the path. 
        try { 
            uri = URLDecoder.decode(uri, "UTF-8"); 
        } catch (UnsupportedEncodingException e) { 
            throw new Error(e); 
        } 
   
        if (!uri.startsWith("/")) { 
            return null; 
        } 
   
        // Convert file separators. 
        uri = uri.replace('/', File.separatorChar); 
   
        // Simplistic dumb security check. 
        // You will have to do something serious in the production environment. 
        if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".") 
                || INSECURE_URI.matcher(uri).matches()) { 
            return null; 
        } 
   
        // Convert to absolute path. 
        return SystemPropertyUtil.get("user.dir") + File.separator + uri; 
    } 
   

    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { 
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8)); 
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); 
   
        // Close the connection as soon as the error message is sent. 
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); 
    } 

    /**
     * Sets the content type header for the HTTP Response
     * 
     * @param response
     *            HTTP response
     * @param file
     *            file to extract content type
     */ 
    private static void setContentTypeHeader(HttpResponse response, File file) {
      MimetypesFileTypeMap m = new MimetypesFileTypeMap();
      String contentType = m.getContentType(file.getPath());
        if (!contentType.equals("application/octet-stream")) {
          contentType += "; charset=utf-8";
        }
        response.headers().set(CONTENT_TYPE, contentType);
    } 
   
}

这里开始是客户端

package netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.net.URI;

public class HttpDownloadClient {
  /**
   * 下载http资源 向服务器下载直接填写要下载的文件的相对路径
   *        (↑↑↑建议只使用字母和数字对特殊字符对字符进行部分过滤可能导致异常↑↑↑)
   *        向互联网下载输入完整路径
   * @param host 目的主机ip或域名
   * @param port 目标主机端口
   * @param url 文件路径
   * @param local 本地存储路径
   * @throws Exception
   */
  public void connect(String host, int port, String url, final String local) throws Exception {
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      Bootstrap b = new Bootstrap();
      b.group(workerGroup);
      b.channel(NioSocketChannel.class);
      b.option(ChannelOption.SO_KEEPALIVE, true);
      b.handler(new ChildChannelHandler(local));

      // Start the client.
      ChannelFuture f = b.connect(host, port).sync();

      URI uri = new URI(url);
      DefaultFullHttpRequest request = new DefaultFullHttpRequest(
          HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());

      // 构建http请求
      request.headers().set(HttpHeaders.Names.HOST, host);
      request.headers().set(HttpHeaders.Names.CONNECTION,
          HttpHeaders.Values.KEEP_ALIVE);
      request.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
          request.content().readableBytes());
      // 发送http请求
      f.channel().write(request);
      f.channel().flush();
      f.channel().closeFuture().sync();
    } finally {
      workerGroup.shutdownGracefully();
    }

  }

  private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    String local;
    public ChildChannelHandler(String local) {
      this.local = local;
    }
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
      // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
      ch.pipeline().addLast(new HttpResponseDecoder());
      // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
      ch.pipeline().addLast(new HttpRequestEncoder());
      ch.pipeline().addLast(new ChunkedWriteHandler());
      ch.pipeline().addLast(new HttpDownloadHandler(local));
    }
    
  }
  public static void main(String[] args) throws Exception {
    HttpDownloadClient client = new HttpDownloadClient();
    client.connect("127.0.0.1", 9003,"/file/ppppp","ppppp");
    //client.connect("zlysix.gree.com", 80, "http://zlysix.gree.com/HelloWeb/download/20m.apk", "20m.apk");

  }
}
package netty;

import java.io.File;
import java.io.FileOutputStream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
//import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.internal.SystemPropertyUtil;

public class HttpDownloadHandler extends ChannelInboundHandlerAdapter {
  private boolean readingChunks = false; // 分块读取开关
  private FileOutputStream fOutputStream = null;// 文件输出流
  private File localfile = null;// 下载文件的本地对象
  private String local = null;// 待下载文件名
  private int succCode;// 状态码

  public HttpDownloadHandler(String local) {
    this.local = local;
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg)
      throws Exception {
    if (msg instanceof HttpResponse) {// response头信息
      HttpResponse response = (HttpResponse) msg;
      succCode = response.getStatus().code();
      if (succCode == 200) {
        setDownLoadFile();// 设置下载文件
        readingChunks = true;
      }
      // System.out.println("CONTENT_TYPE:"
      // + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
    }
    if (msg instanceof HttpContent) {// response体信息
      HttpContent chunk = (HttpContent) msg;
      if (chunk instanceof LastHttpContent) {
        readingChunks = false;
      }

      ByteBuf buffer = chunk.content();
      byte[] dst = new byte[buffer.readableBytes()];
      if (succCode == 200) {
        while (buffer.isReadable()) {
          buffer.readBytes(dst);
          fOutputStream.write(dst);
          buffer.release();
        }
        if (null != fOutputStream) {
          fOutputStream.flush();
        }
      }

    }
    if (!readingChunks) {
      if (null != fOutputStream) {
        System.out.println("Download done->"+ localfile.getAbsolutePath());
        fOutputStream.flush();
        fOutputStream.close();
        localfile = null;
        fOutputStream = null;
      }
      ctx.channel().close();
    }
  }

  /**
   * 配置本地参数,准备下载
   */
  private void setDownLoadFile() throws Exception {
    if (null == fOutputStream) {
      local = SystemPropertyUtil.get("user.dir") + File.separator +local;
      //System.out.println(local);
      localfile = new File(local);
      if (!localfile.exists()) {
        localfile.createNewFile();
      }
      fOutputStream = new FileOutputStream(localfile);
    }
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
      throws Exception {
    System.out.println("管道异常:" + cause.getMessage());
    cause.printStackTrace();
    ctx.channel().close();
  }
}

文章二维码