HttpServiceDispatcher
作者:互联网
package wm.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.AsciiString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.tikv.shade.com.google.gson.JsonObject;
import wm.http.jsch.JSchConnect;
import wm.http.service.Request;
import wm.http.service.RequestInteceptor;
import wm.http.service.Response;
import wm.http.service.ServiceContext;
import wm.tools.Cmder;
import wm.tools.Mimetypes;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.net.http.WebSocket;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static io.netty.handler.codec.http.HttpHeaderNames.*;
/**
*
* Dispatch http service methods
* @author
*
*/
public class HttpServiceDispatcher extends SimpleChannelInboundHandler<Object> {
private final static Logger logger = LogManager.getLogger(HttpServer.class);
private static Map<Channel, JSchConnect> cmderMap = new ConcurrentHashMap<>();
private final ServiceContext ctx;
static {
new Thread(()->{
while (true){
try {
cmderMap.values().forEach(v -> {
try {
v.read();
}catch (Exception e){
}
});
Thread.sleep(50);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public HttpServiceDispatcher(ServiceContext ctx) {
this.ctx = ctx;
}
private HttpRequest request;
private Request req;
private WebSocketServerHandshaker handshaker;
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = this.request = (HttpRequest) msg;
if ("/ws".equalsIgnoreCase(request.uri())) {
System.out.println("websocket 请求接入");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"/ws", null, false);
handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), request);
Channel channel = ctx.channel();
JSchConnect jSchConnect = new JSchConnect(channel);
cmderMap.put(channel, jSchConnect);
}
}else {
this.req = new Request(((InetSocketAddress)(ctx.channel().remoteAddress())).getAddress(), request.uri(), request.headers(), request.method());
if ( HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
req.appendDecoderResult(request.decoderResult());
}
}else if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.content();
if (content.isReadable() && content.capacity() > 0) {
req.appendContent(content);
req.appendDecoderResult(request.decoderResult());
}
if (msg instanceof LastHttpContent) {
service(ctx);
}
}else if (msg instanceof WebSocketFrame){
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
}
}
// 处理Websocket的代码
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否是关闭链路的指令
Channel channel = ctx.channel();
if (frame instanceof CloseWebSocketFrame) {
JSchConnect connect = cmderMap.remove(channel);
connect.close();
handshaker.close(channel, (CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
channel.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 文本消息,不支持二进制消息
if (frame instanceof TextWebSocketFrame) {
//请求text
String msg = ((TextWebSocketFrame) frame).text();
logger.info("re:" + msg);
JSchConnect cmder = cmderMap.get(channel);
if (!cmder.isInit()){
cmder.connect("root", "zhenyu.yin", "10.12.7.157", 22);
}else {
cmder.write(msg);
}
}
}
private void service(ChannelHandlerContext ctx) {
if(req.isSuccess()) {
ServiceContext.ServiceInvoker invoker = this.ctx.get(req.path());
if(invoker == null) {
sendAsFile(ctx);
} else {
doService(ctx, invoker);
}
} else {
sendBadRequestResponse(ctx);
}
}
protected void doService(ChannelHandlerContext ctx, ServiceContext.ServiceInvoker invoker) {
RequestInteceptor interceptor = this.ctx.Interceptor();
Response intecept = interceptor.intecept(req);
if(intecept != null) {
sendResponse(ctx, intecept);
return;
}
try {
sendResponse(ctx, invoker.invoke(req));
} catch (Exception ex) {
Response response = this.ctx.errorHandler().handle(req, ex);
sendResponse(ctx, response);
}
}
protected void sendAsFile(ChannelHandlerContext ctx) {
String path = null;
try {
path = URLDecoder.decode(req.path(), "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String file = this.ctx.fileHandler().getPath(path);
if(file != null) {
try {
sendFileResponse(ctx, file);
} catch (Exception ex) {
sendResponse(ctx, this.ctx.errorHandler().handle(req, ex));
}
} else {
sendNotFoudResponse(ctx);
}
}
protected void sendResponse(ChannelHandlerContext ctx, Response resp) {
sendContentResponse(ctx, HttpResponseStatus.OK, resp);
}
protected void sendNotFoudResponse(ChannelHandlerContext ctx) {
logger.debug(req.path() + " not found");
sendContentResponse(ctx, HttpResponseStatus.NOT_FOUND, Response.NOT_FOUND);
}
protected void sendBadRequestResponse(ChannelHandlerContext ctx) {
sendContentResponse(ctx, HttpResponseStatus.BAD_REQUEST, Response.BAD_REQUEST);
}
protected void sendContentResponse(ChannelHandlerContext ctx, HttpResponseStatus status, Response resp) {
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
status,
resp.content.copy());
setContentType(response, resp.type.val);
Map<CharSequence, Object> headers = resp.headers;
if (headers != null && !headers.isEmpty()){
headers.forEach((k, v) ->{
response.headers().set(k, v);
});
}
// Write the response.
ctx.write(response);
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
protected void setContentType(HttpResponse response, String contentType) {
response.headers().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.headers().set(ACCESS_CONTROL_ALLOW_HEADERS, "*");
response.headers().set(CONTENT_TYPE, contentType);
// response.headers().set("Content-Encoding", "gzip, deflate");
response.headers().set(ACCESS_CONTROL_ALLOW_CREDENTIALS, true);
response.headers().set(TRANSFER_ENCODING , "chunked");
// response.headers().set(ACCEPT_ENCODING, "gzip");
}
public static final String HTTP_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final int HTTP_CACHE_SECONDS = 60;
@SuppressWarnings("resource")
protected void sendFileResponse(ChannelHandlerContext ctx, String path) throws Exception {
File file = new File(path);
if (!file.exists() || !file.isFile() || file.isHidden()) {
sendNotFoudResponse(ctx);
return;
}
// Cache Validation
String ifModifiedSince = request.headers().get(HttpHeaders.Names.IF_MODIFIED_SINCE);
if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT);
Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);
// Only compare up to the second because the datetime format we send to the client
// does not have milliseconds
long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
long fileLastModifiedSeconds = file.lastModified() / 1000;
if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
sendNotModified(ctx);
return;
}
}
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException ignore) {
sendNotFoudResponse(ctx);
return;
}
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
setContentType(response, Mimetypes.get(file.getName()));
response.headers().set(CONTENT_LENGTH, fileLength);
// response.headers().set(new AsciiString("file-name"), new AsciiString(file.getName()));
setDateAndCacheHeaders(response, file);
// Write the initial line and the header.
ctx.write(response);
if(!HttpMethod.HEAD.equals(request.method())) {
// Write the content.
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
}
// Write the end marker.
// Close the connection when the whole content is written out.
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
}
protected void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.write(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
cmderMap.remove(ctx.channel());
}
protected void sendNotModified(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_MODIFIED);
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT);
Calendar time = new GregorianCalendar();
response.headers().set(DATE, dateFormatter.format(time.getTime()));
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
protected void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT);
// Date header
Calendar time = new GregorianCalendar();
response.headers().set(DATE, dateFormatter.format(time.getTime()));
// Add cache headers
time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
}
}
标签:HttpServiceDispatcher,void,ctx,headers,new,import,response 来源: https://www.cnblogs.com/game-server/p/16617430.html