当前位置:首页 > 资讯 > 正文

【设计模式】责任链模式(Chain of Responsibility Pattern)

【设计模式】责任链模式(Chain of Responsibility Pattern)



要实现责任链模式,首先需要定义具体的链路责任。看下继承关系图:

public abstract class RequestHandler {  private final RequestHandler next;  /**
   * Request handler.
   */
  public void handleRequest(Request req) {    if (next != null) {
      next.handleRequest(req);
    }
  }  protected void printHandling(Request req) {
    LOGGER.info("{} handling request "{}"", this, req);
  }  @Override
  public abstract String toString();
}
public class Soldier extends RequestHandler {  public Soldier(RequestHandler handler) {    super(handler);
  }  @Override
  public void handleRequest(Request req) {    if (RequestType.COLLECT_TAX == req.getRequestType()) {
      printHandling(req);
      req.markHandled();
    } else {      super.handleRequest(req);
    }
  }  @Override
  public String toString() {    return "soldier";
  }
}
public class Officer extends RequestHandler {  public Officer(RequestHandler handler) {    super(handler);
  }  @Override
  public void handleRequest(Request req) {    if (RequestType.TORTURE_PRISONER == req.getRequestType()) {
      printHandling(req);
      req.markHandled();
    } else {      super.handleRequest(req);
    }
  }  @Override
  public String toString() {    return "officer";
  }

}
public class Commander extends RequestHandler {  public Commander(RequestHandler handler) {    super(handler);
  }  @Override
  public void handleRequest(Request req) {    if (RequestType.DEFEND_CASTLE == req.getRequestType()) {
      printHandling(req);
      req.markHandled();
    } else {      super.handleRequest(req);
    }
  }  @Override
  public String toString() {    return "commander";
  }
}
public class King {  private RequestHandler chain;  public King() {
    buildChain();
  }  private void buildChain() {
    chain = new Commander(new Officer(new Soldier(null)));
  }  public void makeRequest(Request req) {
    chain.handleRequest(req);
  }

}
public abstract class RequestHandler<T> {    protected RequestHandler next;    /**
     * Request handler.
     */
    public void handleRequest(Request req) {        if (next != null) {
            next.handleRequest(req);
        }
    }    protected void printHandling(Request req) {
        LOGGER.info("{} handling request "{}"", this, req);
    }    @Override
    public abstract String toString();    public static class Builder<T> {        private RequestHandler<T> head;        private RequestHandler<T> tail;        public RequestHandler<T> build() {            return this.head;
        }        public Builder<T> addHandler(RequestHandler<T> handler) {            if (this.head == null) {                this.head = this.tail = handler;
            }            this.tail.next = handler;            this.tail = handler;            return this;
        }
    }
}
private void buildChain() {
  RequestHandler.Builder builder = new RequestHandler.Builder();
  builder.addHandler(new Commander())
          .addHandler(new Officer())
          .addHandler(new Soldier());
  chain = builder.build();
}
public interface Filter {    // 省去无关代码 

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException;    // 省去无关代码}
package javax.servlet;import java.io.IOException;    /**
    * A FilterChain is an object provided by the servlet container to the developer
    * giving a view into the invocation chain of a filtered request for a resource. Filters
    * use the FilterChain to invoke the next filter in the chain, or if the calling filter
    * is the last filter in the chain, to invoke the resource at the end of the chain.
    *
    * @see Filter
    * @since Servlet 2.3
    **/public interface FilterChain {   /**
   * Causes the next filter in the chain to be invoked, or if the calling filter is the last filter
   * in the chain, causes the resource at the end of the chain to be invoked.
   *
   * @param request the request to pass along the chain.
   * @param response the response to pass along the chain.
   */
    public void doFilter ( ServletRequest request, ServletResponse response ) throws IOException, ServletException;

}
public void doFilter(ServletRequest request, ServletResponse response)
    throws IOException, ServletException{    final Request baseRequest=Request.getBaseRequest(request);    // pass to next filter
    if (_filterHolder!=null)
    {        if (LOG.isDebugEnabled())
            LOG.debug("call filter {}", _filterHolder);
        Filter filter= _filterHolder.getFilter();        //if the request already does not support async, then the setting for the filter
        //is irrelevant. However if the request supports async but this filter does not
        //temporarily turn it off for the execution of the filter
        if (baseRequest.isAsyncSupported() && !_filterHolder.isAsyncSupported())
        { 
            try
            {
                baseRequest.setAsyncSupported(false,_filterHolder.toString());
                filter.doFilter(request, response, _next);
            }            finally
            {
                baseRequest.setAsyncSupported(true,null);
            }
        }        else
            filter.doFilter(request, response, _next);        return;
    }    // Call servlet
    HttpServletRequest srequest = (HttpServletRequest)request;    if (_servletHolder == null)
        notFound(baseRequest, srequest, (HttpServletResponse)response);    else
    {        if (LOG.isDebugEnabled())
            LOG.debug("call servlet " + _servletHolder);
        _servletHolder.handle(baseRequest,request, response);
    }
}
private static class VirtualFilterChain implements FilterChain {   private final FilterChain originalChain;   private final List<? extends Filter> additionalFilters;   private int currentPosition = 0;   public VirtualFilterChain(FilterChain chain, List<? extends Filter> additionalFilters) {      this.originalChain = chain;      this.additionalFilters = additionalFilters;
   }   @Override
   public void doFilter(final ServletRequest request, final ServletResponse response)
         throws IOException, ServletException {      if (this.currentPosition == this.additionalFilters.size()) {         this.originalChain.doFilter(request, response);
      }      else {         this.currentPosition++;
         Filter nextFilter = this.additionalFilters.get(this.currentPosition - 1);
         nextFilter.doFilter(request, response, this);
      }
   }
}
@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
      throws IOException, ServletException {   new VirtualFilterChain(chain, this.filters).doFilter(request, response);
}
* <pre>
*                                                 I/O Request
*                                            via {@link Channel} or*                                        {@link ChannelHandlerContext}
*                                                      |
*  +---------------------------------------------------+---------------+
*  |                           ChannelPipeline         |               |*  |                                                  |/              |
*  |    +---------------------+            +-----------+----------+    |
*  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|                                  |               |
*  |               |                                  |/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|                                  .               |*  |               .                                   .               |*  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|*  |        [ method call]                       [method call]         |*  |               .                                   .               |*  |               .                                  |/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|                                  |               |
*  |               |                                  |/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|                                  |               |
*  +---------------+-----------------------------------+---------------+
*                  |                                  |/
*  +---------------+-----------------------------------+---------------+
*  |               |                                   |               |
*  |       [ Socket.read() ]                    [ Socket.write() ]     |
*  |                                                                   |
*  |  Netty Internal I/O Threads (Transport Implementation)            |
*  +-------------------------------------------------------------------+
* </pre>
public interface ChannelHandler {    /**
     * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;    /**
     * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
     * anymore.
     */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;    /**
     * Gets called if a {@link Throwable} was thrown.
     *
     * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
     * implement the method there.
     */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;    /**
     * Indicates that the same instance of the annotated {@link ChannelHandler}
     * can be added to one or more {@link ChannelPipeline}s multiple times
     * without a race condition.
     * <p>
     * If this annotation is not specified, you have to create a new handler
     * instance every time you add it to a pipeline because it has unshared
     * state such as member variables.
     * <p>
     * This annotation is provided for documentation purpose, just like
     * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
     */
    @Inherited
    @Documented
    @Target(ElementType.TYPE)    @Retention(RetentionPolicy.RUNTIME)    @interface Sharable {        // no value
    }
}
public class DefaultChannelPipeline implements ChannelPipeline {    static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);    private static final String HEAD_NAME = generateName0(HeadContext.class);    private static final String TAIL_NAME = generateName0(TailContext.class);    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =            new FastThreadLocal<Map<Class<?>, String>>() {        @Override
        protected Map<Class<?>, String> initialValue() {            return new WeakHashMap<Class<?>, String>();
        }
    };    private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
            AtomicReferenceFieldUpdater.newUpdater(
                    DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");    final AbstractChannelHandlerContext head;    final AbstractChannelHandlerContext tail;    private final Channel channel;    private final ChannelFuture succeededFuture;    private final VoidChannelPromise voidPromise;    private final boolean touch = ResourceLeakDetector.isEnabled();    private Map<EventExecutorGroup, EventExecutor> childExecutors;    private volatile MessageSizeEstimator.Handle estimatorHandle;    private boolean firstRegistration = true;    /**
     * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
     * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
     *
     * We only keep the head because it is expected that the list is used infrequently and its size is small.
     * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
     * complexity.
     */
    private PendingHandlerCallback pendingHandlerCallbackHead;    /**
     * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
     * change.
     */
    private boolean registered;    protected DefaultChannelPipeline(Channel channel) {        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
public class TorrentProcessorFactory implements ProcessorFactory {    private Map<Class<?>, Processor<?>> processors() {
        Map<Class<?>, Processor<?>> processors = new HashMap<>();

        processors.put(TorrentContext.class, createTorrentProcessor());
        processors.put(MagnetContext.class, createMagnetProcessor());        return processors;
    }    protected ChainProcessor<TorrentContext> createTorrentProcessor() {

        ProcessingStage<TorrentContext> stage5 = new SeedStage<>(null, torrentRegistry);

        ProcessingStage<TorrentContext> stage4 = new ProcessTorrentStage<>(stage5, torrentRegistry, trackerService, eventSink);

        ProcessingStage<TorrentContext> stage3 = new ChooseFilesStage<>(stage4, torrentRegistry, assignmentFactory, config);

        ProcessingStage<TorrentContext> stage2 = new InitializeTorrentProcessingStage<>(stage3, connectionPool,
                torrentRegistry, dataWorker, bufferedPieceRegistry, manualControlService, eventSink, config);

        ProcessingStage<TorrentContext> stage1 = new CreateSessionStage<>(stage2, torrentRegistry, eventSource,
                connectionSource, messageDispatcher, messagingAgents, config);

        ProcessingStage<TorrentContext> stage0 = new FetchTorrentStage(stage1, eventSink);        return new ChainProcessor<>(stage0, executor, new TorrentContextFinalizer<>(torrentRegistry, eventSink));
    }    protected ChainProcessor<MagnetContext> createMagnetProcessor() {

        ProcessingStage<MagnetContext> stage5 = new SeedStage<>(null, torrentRegistry);

        ProcessingStage<MagnetContext> stage4 = new ProcessMagnetTorrentStage(stage5, torrentRegistry, trackerService, eventSink);

        ProcessingStage<MagnetContext> stage3 = new ChooseFilesStage<>(stage4, torrentRegistry, assignmentFactory, config);

        ProcessingStage<MagnetContext> stage2 = new InitializeMagnetTorrentProcessingStage(stage3, connectionPool,
                torrentRegistry, dataWorker, bufferedPieceRegistry, manualControlService, eventSink, config);

        ProcessingStage<MagnetContext> stage1 = new FetchMetadataStage(stage2, metadataService, torrentRegistry,
                peerRegistry, eventSink, eventSource, config);

        ProcessingStage<MagnetContext> stage0 = new CreateSessionStage<>(stage1, torrentRegistry, eventSource,
                connectionSource, messageDispatcher, messagingAgents, config);        return new ChainProcessor<>(stage0, executor, new TorrentContextFinalizer<>(torrentRegistry, eventSink));
    }
package bt.processor;import bt.processor.listener.ProcessingEvent;/**
 * @param <C> Type of processing context
 * @since 1.3
 */public interface ProcessingStage<C extends ProcessingContext> {    /**
     * @return Type of event, that should be triggered after this stage has completed.
     * @since 1.5
     */
    ProcessingEvent after();    /**
     * @param context Processing context
     * @return Next stage
     * @since 1.3
     */
    ProcessingStage<C> execute(C context);
}