要实现责任链模式,首先需要定义具体的链路责任。看下继承关系图:
public abstract class RequestHandler { private final RequestHandler next; /** * Request handler. */ public void handleRequest(Request req) { if (next != null) { next.handleRequest(req); } } protected void printHandling(Request req) { LOGGER.info("{} handling request "{}"", this, req); } @Override public abstract String toString(); }
public class Soldier extends RequestHandler { public Soldier(RequestHandler handler) { super(handler); } @Override public void handleRequest(Request req) { if (RequestType.COLLECT_TAX == req.getRequestType()) { printHandling(req); req.markHandled(); } else { super.handleRequest(req); } } @Override public String toString() { return "soldier"; } }
public class Officer extends RequestHandler { public Officer(RequestHandler handler) { super(handler); } @Override public void handleRequest(Request req) { if (RequestType.TORTURE_PRISONER == req.getRequestType()) { printHandling(req); req.markHandled(); } else { super.handleRequest(req); } } @Override public String toString() { return "officer"; } }
public class Commander extends RequestHandler { public Commander(RequestHandler handler) { super(handler); } @Override public void handleRequest(Request req) { if (RequestType.DEFEND_CASTLE == req.getRequestType()) { printHandling(req); req.markHandled(); } else { super.handleRequest(req); } } @Override public String toString() { return "commander"; } }
public class King { private RequestHandler chain; public King() { buildChain(); } private void buildChain() { chain = new Commander(new Officer(new Soldier(null))); } public void makeRequest(Request req) { chain.handleRequest(req); } }
public abstract class RequestHandler<T> { protected RequestHandler next; /** * Request handler. */ public void handleRequest(Request req) { if (next != null) { next.handleRequest(req); } } protected void printHandling(Request req) { LOGGER.info("{} handling request "{}"", this, req); } @Override public abstract String toString(); public static class Builder<T> { private RequestHandler<T> head; private RequestHandler<T> tail; public RequestHandler<T> build() { return this.head; } public Builder<T> addHandler(RequestHandler<T> handler) { if (this.head == null) { this.head = this.tail = handler; } this.tail.next = handler; this.tail = handler; return this; } } }
private void buildChain() { RequestHandler.Builder builder = new RequestHandler.Builder(); builder.addHandler(new Commander()) .addHandler(new Officer()) .addHandler(new Soldier()); chain = builder.build(); }
public interface Filter { // 省去无关代码 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException; // 省去无关代码}
package javax.servlet;import java.io.IOException; /** * A FilterChain is an object provided by the servlet container to the developer * giving a view into the invocation chain of a filtered request for a resource. Filters * use the FilterChain to invoke the next filter in the chain, or if the calling filter * is the last filter in the chain, to invoke the resource at the end of the chain. * * @see Filter * @since Servlet 2.3 **/public interface FilterChain { /** * Causes the next filter in the chain to be invoked, or if the calling filter is the last filter * in the chain, causes the resource at the end of the chain to be invoked. * * @param request the request to pass along the chain. * @param response the response to pass along the chain. */ public void doFilter ( ServletRequest request, ServletResponse response ) throws IOException, ServletException; }
public void doFilter(ServletRequest request, ServletResponse response) throws IOException, ServletException{ final Request baseRequest=Request.getBaseRequest(request); // pass to next filter if (_filterHolder!=null) { if (LOG.isDebugEnabled()) LOG.debug("call filter {}", _filterHolder); Filter filter= _filterHolder.getFilter(); //if the request already does not support async, then the setting for the filter //is irrelevant. However if the request supports async but this filter does not //temporarily turn it off for the execution of the filter if (baseRequest.isAsyncSupported() && !_filterHolder.isAsyncSupported()) { try { baseRequest.setAsyncSupported(false,_filterHolder.toString()); filter.doFilter(request, response, _next); } finally { baseRequest.setAsyncSupported(true,null); } } else filter.doFilter(request, response, _next); return; } // Call servlet HttpServletRequest srequest = (HttpServletRequest)request; if (_servletHolder == null) notFound(baseRequest, srequest, (HttpServletResponse)response); else { if (LOG.isDebugEnabled()) LOG.debug("call servlet " + _servletHolder); _servletHolder.handle(baseRequest,request, response); } }
private static class VirtualFilterChain implements FilterChain { private final FilterChain originalChain; private final List<? extends Filter> additionalFilters; private int currentPosition = 0; public VirtualFilterChain(FilterChain chain, List<? extends Filter> additionalFilters) { this.originalChain = chain; this.additionalFilters = additionalFilters; } @Override public void doFilter(final ServletRequest request, final ServletResponse response) throws IOException, ServletException { if (this.currentPosition == this.additionalFilters.size()) { this.originalChain.doFilter(request, response); } else { this.currentPosition++; Filter nextFilter = this.additionalFilters.get(this.currentPosition - 1); nextFilter.doFilter(request, response, this); } } }
@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { new VirtualFilterChain(chain, this.filters).doFilter(request, response); }
* <pre> * I/O Request * via {@link Channel} or* {@link ChannelHandlerContext} * | * +---------------------------------------------------+---------------+ * | ChannelPipeline | |* | |/ | * | +---------------------+ +-----------+----------+ | * | | Inbound Handler N | | Outbound Handler 1 | | * | +----------+----------+ +-----------+----------+ | * | /| | | * | | |/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | +----------+----------+ +-----------+----------+ | * | /| . |* | . . |* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|* | [ method call] [method call] |* | . . |* | . |/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | +----------+----------+ +-----------+----------+ | * | /| | | * | | |/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 1 | | Outbound Handler M | | * | +----------+----------+ +-----------+----------+ | * | /| | | * +---------------+-----------------------------------+---------------+ * | |/ * +---------------+-----------------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+ * </pre>
public interface ChannelHandler { /** * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events. */ void handlerAdded(ChannelHandlerContext ctx) throws Exception; /** * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events * anymore. */ void handlerRemoved(ChannelHandlerContext ctx) throws Exception; /** * Gets called if a {@link Throwable} was thrown. * * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and * implement the method there. */ @Deprecated void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; /** * Indicates that the same instance of the annotated {@link ChannelHandler} * can be added to one or more {@link ChannelPipeline}s multiple times * without a race condition. * <p> * If this annotation is not specified, you have to create a new handler * instance every time you add it to a pipeline because it has unshared * state such as member variables. * <p> * This annotation is provided for documentation purpose, just like * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>. */ @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value } }
public class DefaultChannelPipeline implements ChannelPipeline { static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); private static final String HEAD_NAME = generateName0(HeadContext.class); private static final String TAIL_NAME = generateName0(TailContext.class); private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override protected Map<Class<?>, String> initialValue() { return new WeakHashMap<Class<?>, String>(); } }; private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR = AtomicReferenceFieldUpdater.newUpdater( DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle"); final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel; private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled(); private Map<EventExecutorGroup, EventExecutor> childExecutors; private volatile MessageSizeEstimator.Handle estimatorHandle; private boolean firstRegistration = true; /** * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}. * * We only keep the head because it is expected that the list is used infrequently and its size is small. * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management * complexity. */ private PendingHandlerCallback pendingHandlerCallbackHead; /** * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never * change. */ private boolean registered; protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
public class TorrentProcessorFactory implements ProcessorFactory { private Map<Class<?>, Processor<?>> processors() { Map<Class<?>, Processor<?>> processors = new HashMap<>(); processors.put(TorrentContext.class, createTorrentProcessor()); processors.put(MagnetContext.class, createMagnetProcessor()); return processors; } protected ChainProcessor<TorrentContext> createTorrentProcessor() { ProcessingStage<TorrentContext> stage5 = new SeedStage<>(null, torrentRegistry); ProcessingStage<TorrentContext> stage4 = new ProcessTorrentStage<>(stage5, torrentRegistry, trackerService, eventSink); ProcessingStage<TorrentContext> stage3 = new ChooseFilesStage<>(stage4, torrentRegistry, assignmentFactory, config); ProcessingStage<TorrentContext> stage2 = new InitializeTorrentProcessingStage<>(stage3, connectionPool, torrentRegistry, dataWorker, bufferedPieceRegistry, manualControlService, eventSink, config); ProcessingStage<TorrentContext> stage1 = new CreateSessionStage<>(stage2, torrentRegistry, eventSource, connectionSource, messageDispatcher, messagingAgents, config); ProcessingStage<TorrentContext> stage0 = new FetchTorrentStage(stage1, eventSink); return new ChainProcessor<>(stage0, executor, new TorrentContextFinalizer<>(torrentRegistry, eventSink)); } protected ChainProcessor<MagnetContext> createMagnetProcessor() { ProcessingStage<MagnetContext> stage5 = new SeedStage<>(null, torrentRegistry); ProcessingStage<MagnetContext> stage4 = new ProcessMagnetTorrentStage(stage5, torrentRegistry, trackerService, eventSink); ProcessingStage<MagnetContext> stage3 = new ChooseFilesStage<>(stage4, torrentRegistry, assignmentFactory, config); ProcessingStage<MagnetContext> stage2 = new InitializeMagnetTorrentProcessingStage(stage3, connectionPool, torrentRegistry, dataWorker, bufferedPieceRegistry, manualControlService, eventSink, config); ProcessingStage<MagnetContext> stage1 = new FetchMetadataStage(stage2, metadataService, torrentRegistry, peerRegistry, eventSink, eventSource, config); ProcessingStage<MagnetContext> stage0 = new CreateSessionStage<>(stage1, torrentRegistry, eventSource, connectionSource, messageDispatcher, messagingAgents, config); return new ChainProcessor<>(stage0, executor, new TorrentContextFinalizer<>(torrentRegistry, eventSink)); }
package bt.processor;import bt.processor.listener.ProcessingEvent;/** * @param <C> Type of processing context * @since 1.3 */public interface ProcessingStage<C extends ProcessingContext> { /** * @return Type of event, that should be triggered after this stage has completed. * @since 1.5 */ ProcessingEvent after(); /** * @param context Processing context * @return Next stage * @since 1.3 */ ProcessingStage<C> execute(C context); }
本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕,E-mail:xinmeigg88@163.com
本文链接:http://www.xrbh.cn/tnews/4083.html