
要实现责任链模式,首先需要定义具体的链路责任。看下继承关系图:
public abstract class RequestHandler { private final RequestHandler next; /**
* Request handler.
*/
public void handleRequest(Request req) { if (next != null) {
next.handleRequest(req);
}
} protected void printHandling(Request req) {
LOGGER.info("{} handling request "{}"", this, req);
} @Override
public abstract String toString();
}public class Soldier extends RequestHandler { public Soldier(RequestHandler handler) { super(handler);
} @Override
public void handleRequest(Request req) { if (RequestType.COLLECT_TAX == req.getRequestType()) {
printHandling(req);
req.markHandled();
} else { super.handleRequest(req);
}
} @Override
public String toString() { return "soldier";
}
}public class Officer extends RequestHandler { public Officer(RequestHandler handler) { super(handler);
} @Override
public void handleRequest(Request req) { if (RequestType.TORTURE_PRISONER == req.getRequestType()) {
printHandling(req);
req.markHandled();
} else { super.handleRequest(req);
}
} @Override
public String toString() { return "officer";
}
}public class Commander extends RequestHandler { public Commander(RequestHandler handler) { super(handler);
} @Override
public void handleRequest(Request req) { if (RequestType.DEFEND_CASTLE == req.getRequestType()) {
printHandling(req);
req.markHandled();
} else { super.handleRequest(req);
}
} @Override
public String toString() { return "commander";
}
}public class King { private RequestHandler chain; public King() {
buildChain();
} private void buildChain() {
chain = new Commander(new Officer(new Soldier(null)));
} public void makeRequest(Request req) {
chain.handleRequest(req);
}
}public abstract class RequestHandler<T> { protected RequestHandler next; /**
* Request handler.
*/
public void handleRequest(Request req) { if (next != null) {
next.handleRequest(req);
}
} protected void printHandling(Request req) {
LOGGER.info("{} handling request "{}"", this, req);
} @Override
public abstract String toString(); public static class Builder<T> { private RequestHandler<T> head; private RequestHandler<T> tail; public RequestHandler<T> build() { return this.head;
} public Builder<T> addHandler(RequestHandler<T> handler) { if (this.head == null) { this.head = this.tail = handler;
} this.tail.next = handler; this.tail = handler; return this;
}
}
}private void buildChain() {
RequestHandler.Builder builder = new RequestHandler.Builder();
builder.addHandler(new Commander())
.addHandler(new Officer())
.addHandler(new Soldier());
chain = builder.build();
}public interface Filter { // 省去无关代码
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException; // 省去无关代码}package javax.servlet;import java.io.IOException; /**
* A FilterChain is an object provided by the servlet container to the developer
* giving a view into the invocation chain of a filtered request for a resource. Filters
* use the FilterChain to invoke the next filter in the chain, or if the calling filter
* is the last filter in the chain, to invoke the resource at the end of the chain.
*
* @see Filter
* @since Servlet 2.3
**/public interface FilterChain { /**
* Causes the next filter in the chain to be invoked, or if the calling filter is the last filter
* in the chain, causes the resource at the end of the chain to be invoked.
*
* @param request the request to pass along the chain.
* @param response the response to pass along the chain.
*/
public void doFilter ( ServletRequest request, ServletResponse response ) throws IOException, ServletException;
}public void doFilter(ServletRequest request, ServletResponse response)
throws IOException, ServletException{ final Request baseRequest=Request.getBaseRequest(request); // pass to next filter
if (_filterHolder!=null)
{ if (LOG.isDebugEnabled())
LOG.debug("call filter {}", _filterHolder);
Filter filter= _filterHolder.getFilter(); //if the request already does not support async, then the setting for the filter
//is irrelevant. However if the request supports async but this filter does not
//temporarily turn it off for the execution of the filter
if (baseRequest.isAsyncSupported() && !_filterHolder.isAsyncSupported())
{
try
{
baseRequest.setAsyncSupported(false,_filterHolder.toString());
filter.doFilter(request, response, _next);
} finally
{
baseRequest.setAsyncSupported(true,null);
}
} else
filter.doFilter(request, response, _next); return;
} // Call servlet
HttpServletRequest srequest = (HttpServletRequest)request; if (_servletHolder == null)
notFound(baseRequest, srequest, (HttpServletResponse)response); else
{ if (LOG.isDebugEnabled())
LOG.debug("call servlet " + _servletHolder);
_servletHolder.handle(baseRequest,request, response);
}
}private static class VirtualFilterChain implements FilterChain { private final FilterChain originalChain; private final List<? extends Filter> additionalFilters; private int currentPosition = 0; public VirtualFilterChain(FilterChain chain, List<? extends Filter> additionalFilters) { this.originalChain = chain; this.additionalFilters = additionalFilters;
} @Override
public void doFilter(final ServletRequest request, final ServletResponse response)
throws IOException, ServletException { if (this.currentPosition == this.additionalFilters.size()) { this.originalChain.doFilter(request, response);
} else { this.currentPosition++;
Filter nextFilter = this.additionalFilters.get(this.currentPosition - 1);
nextFilter.doFilter(request, response, this);
}
}
}@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException { new VirtualFilterChain(chain, this.filters).doFilter(request, response);
}* <pre>
* I/O Request
* via {@link Channel} or* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |* | |/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /| | |
* | | |/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /| . |* | . . |* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|* | [ method call] [method call] |* | . . |* | . |/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /| | |
* | | |/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /| | |
* +---------------+-----------------------------------+---------------+
* | |/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
* </pre>public interface ChannelHandler { /**
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception; /**
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception; /**
* Gets called if a {@link Throwable} was thrown.
*
* @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
* implement the method there.
*/
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; /**
* Indicates that the same instance of the annotated {@link ChannelHandler}
* can be added to one or more {@link ChannelPipeline}s multiple times
* without a race condition.
* <p>
* If this annotation is not specified, you have to create a new handler
* instance every time you add it to a pipeline because it has unshared
* state such as member variables.
* <p>
* This annotation is provided for documentation purpose, just like
* <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
*/
@Inherited
@Documented
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value
}
}public class DefaultChannelPipeline implements ChannelPipeline { static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); private static final String HEAD_NAME = generateName0(HeadContext.class); private static final String TAIL_NAME = generateName0(TailContext.class); private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override
protected Map<Class<?>, String> initialValue() { return new WeakHashMap<Class<?>, String>();
}
}; private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle"); final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel; private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled(); private Map<EventExecutorGroup, EventExecutor> childExecutors; private volatile MessageSizeEstimator.Handle estimatorHandle; private boolean firstRegistration = true; /**
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
* all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
*
* We only keep the head because it is expected that the list is used infrequently and its size is small.
* Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
* complexity.
*/
private PendingHandlerCallback pendingHandlerCallbackHead; /**
* Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
* change.
*/
private boolean registered; protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}public class TorrentProcessorFactory implements ProcessorFactory { private Map<Class<?>, Processor<?>> processors() {
Map<Class<?>, Processor<?>> processors = new HashMap<>();
processors.put(TorrentContext.class, createTorrentProcessor());
processors.put(MagnetContext.class, createMagnetProcessor()); return processors;
} protected ChainProcessor<TorrentContext> createTorrentProcessor() {
ProcessingStage<TorrentContext> stage5 = new SeedStage<>(null, torrentRegistry);
ProcessingStage<TorrentContext> stage4 = new ProcessTorrentStage<>(stage5, torrentRegistry, trackerService, eventSink);
ProcessingStage<TorrentContext> stage3 = new ChooseFilesStage<>(stage4, torrentRegistry, assignmentFactory, config);
ProcessingStage<TorrentContext> stage2 = new InitializeTorrentProcessingStage<>(stage3, connectionPool,
torrentRegistry, dataWorker, bufferedPieceRegistry, manualControlService, eventSink, config);
ProcessingStage<TorrentContext> stage1 = new CreateSessionStage<>(stage2, torrentRegistry, eventSource,
connectionSource, messageDispatcher, messagingAgents, config);
ProcessingStage<TorrentContext> stage0 = new FetchTorrentStage(stage1, eventSink); return new ChainProcessor<>(stage0, executor, new TorrentContextFinalizer<>(torrentRegistry, eventSink));
} protected ChainProcessor<MagnetContext> createMagnetProcessor() {
ProcessingStage<MagnetContext> stage5 = new SeedStage<>(null, torrentRegistry);
ProcessingStage<MagnetContext> stage4 = new ProcessMagnetTorrentStage(stage5, torrentRegistry, trackerService, eventSink);
ProcessingStage<MagnetContext> stage3 = new ChooseFilesStage<>(stage4, torrentRegistry, assignmentFactory, config);
ProcessingStage<MagnetContext> stage2 = new InitializeMagnetTorrentProcessingStage(stage3, connectionPool,
torrentRegistry, dataWorker, bufferedPieceRegistry, manualControlService, eventSink, config);
ProcessingStage<MagnetContext> stage1 = new FetchMetadataStage(stage2, metadataService, torrentRegistry,
peerRegistry, eventSink, eventSource, config);
ProcessingStage<MagnetContext> stage0 = new CreateSessionStage<>(stage1, torrentRegistry, eventSource,
connectionSource, messageDispatcher, messagingAgents, config); return new ChainProcessor<>(stage0, executor, new TorrentContextFinalizer<>(torrentRegistry, eventSink));
}package bt.processor;import bt.processor.listener.ProcessingEvent;/**
* @param <C> Type of processing context
* @since 1.3
*/public interface ProcessingStage<C extends ProcessingContext> { /**
* @return Type of event, that should be triggered after this stage has completed.
* @since 1.5
*/
ProcessingEvent after(); /**
* @param context Processing context
* @return Next stage
* @since 1.3
*/
ProcessingStage<C> execute(C context);
}本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕,E-mail:xinmeigg88@163.com
本文链接:http://www.xrbh.cn/tnews/4083.html