1201 lines
44 KiB
Java
1201 lines
44 KiB
Java
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
* contributor license agreements. See the NOTICE file distributed with
|
|
* this work for additional information regarding copyright ownership.
|
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
* (the "License"); you may not use this file except in compliance with
|
|
* the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
package org.apache.coyote;
|
|
|
|
import java.net.InetAddress;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.Collections;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import javax.management.InstanceNotFoundException;
|
|
import javax.management.MBeanRegistration;
|
|
import javax.management.MBeanRegistrationException;
|
|
import javax.management.MBeanServer;
|
|
import javax.management.MalformedObjectNameException;
|
|
import javax.management.ObjectName;
|
|
import javax.servlet.http.HttpUpgradeHandler;
|
|
import javax.servlet.http.WebConnection;
|
|
|
|
import org.apache.juli.logging.Log;
|
|
import org.apache.tomcat.InstanceManager;
|
|
import org.apache.tomcat.util.ExceptionUtils;
|
|
import org.apache.tomcat.util.collections.SynchronizedStack;
|
|
import org.apache.tomcat.util.modeler.Registry;
|
|
import org.apache.tomcat.util.net.AbstractEndpoint;
|
|
import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
|
|
import org.apache.tomcat.util.net.SocketEvent;
|
|
import org.apache.tomcat.util.net.SocketWrapperBase;
|
|
import org.apache.tomcat.util.res.StringManager;
|
|
|
|
public abstract class AbstractProtocol<S> implements ProtocolHandler,
|
|
MBeanRegistration {
|
|
|
|
/**
|
|
* The string manager for this package.
|
|
*/
|
|
private static final StringManager sm = StringManager.getManager(AbstractProtocol.class);
|
|
|
|
|
|
/**
|
|
* Counter used to generate unique JMX names for connectors using automatic
|
|
* port binding.
|
|
*/
|
|
private static final AtomicInteger nameCounter = new AtomicInteger(0);
|
|
|
|
|
|
/**
|
|
* Name of MBean for the Global Request Processor.
|
|
*/
|
|
protected ObjectName rgOname = null;
|
|
|
|
|
|
/**
|
|
* Unique ID for this connector. Only used if the connector is configured
|
|
* to use a random port as the port will change if stop(), start() is
|
|
* called.
|
|
*/
|
|
private int nameIndex = 0;
|
|
|
|
|
|
/**
|
|
* Endpoint that provides low-level network I/O - must be matched to the
|
|
* ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO
|
|
* Endpoint etc.).
|
|
*/
|
|
private final AbstractEndpoint<S> endpoint;
|
|
|
|
|
|
private Handler<S> handler;
|
|
|
|
|
|
private final Set<Processor> waitingProcessors =
|
|
Collections.newSetFromMap(new ConcurrentHashMap<Processor, Boolean>());
|
|
|
|
|
|
/**
|
|
* The timeout thread.
|
|
*/
|
|
private AsyncTimeout asyncTimeout = null;
|
|
|
|
|
|
public AbstractProtocol(AbstractEndpoint<S> endpoint) {
|
|
this.endpoint = endpoint;
|
|
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
|
|
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
|
|
}
|
|
|
|
|
|
// ----------------------------------------------- Generic property handling
|
|
|
|
/**
|
|
* Generic property setter used by the digester. Other code should not need
|
|
* to use this. The digester will only use this method if it can't find a
|
|
* more specific setter. That means the property belongs to the Endpoint,
|
|
* the ServerSocketFactory or some other lower level component. This method
|
|
* ensures that it is visible to both.
|
|
*
|
|
* @param name The name of the property to set
|
|
* @param value The value, in string form, to set for the property
|
|
*
|
|
* @return <code>true</code> if the property was set successfully, otherwise
|
|
* <code>false</code>
|
|
*/
|
|
public boolean setProperty(String name, String value) {
|
|
return endpoint.setProperty(name, value);
|
|
}
|
|
|
|
|
|
/**
|
|
* Generic property getter used by the digester. Other code should not need
|
|
* to use this.
|
|
*
|
|
* @param name The name of the property to get
|
|
*
|
|
* @return The value of the property converted to a string
|
|
*/
|
|
public String getProperty(String name) {
|
|
return endpoint.getProperty(name);
|
|
}
|
|
|
|
|
|
// ------------------------------- Properties managed by the ProtocolHandler
|
|
|
|
/**
|
|
* The adapter provides the link between the ProtocolHandler and the
|
|
* connector.
|
|
*/
|
|
protected Adapter adapter;
|
|
@Override
|
|
public void setAdapter(Adapter adapter) { this.adapter = adapter; }
|
|
@Override
|
|
public Adapter getAdapter() { return adapter; }
|
|
|
|
|
|
/**
|
|
* The maximum number of idle processors that will be retained in the cache
|
|
* and re-used with a subsequent request. The default is 200. A value of -1
|
|
* means unlimited. In the unlimited case, the theoretical maximum number of
|
|
* cached Processor objects is {@link #getMaxConnections()} although it will
|
|
* usually be closer to {@link #getMaxThreads()}.
|
|
*/
|
|
protected int processorCache = 200;
|
|
public int getProcessorCache() { return this.processorCache; }
|
|
public void setProcessorCache(int processorCache) {
|
|
this.processorCache = processorCache;
|
|
}
|
|
|
|
|
|
/**
|
|
* When client certificate information is presented in a form other than
|
|
* instances of {@link java.security.cert.X509Certificate} it needs to be
|
|
* converted before it can be used and this property controls which JSSE
|
|
* provider is used to perform the conversion. For example it is used with
|
|
* the AJP connectors, the HTTP APR connector and with the
|
|
* {@link org.apache.catalina.valves.SSLValve}. If not specified, the
|
|
* default provider will be used.
|
|
*/
|
|
protected String clientCertProvider = null;
|
|
public String getClientCertProvider() { return clientCertProvider; }
|
|
public void setClientCertProvider(String s) { this.clientCertProvider = s; }
|
|
|
|
|
|
@Override
|
|
public boolean isAprRequired() {
|
|
return false;
|
|
}
|
|
|
|
|
|
@Override
|
|
public boolean isSendfileSupported() {
|
|
return endpoint.getUseSendfile();
|
|
}
|
|
|
|
|
|
public AsyncTimeout getAsyncTimeout() {
|
|
return asyncTimeout;
|
|
}
|
|
|
|
/**
|
|
* Specifies whether the reason phrase will be sent in the response.
|
|
* By default a reason phrase will not be sent in the response.
|
|
*
|
|
* @deprecated This option will be removed in Tomcat 9. Reason phrase will
|
|
* not be sent.
|
|
*/
|
|
@Deprecated
|
|
private boolean sendReasonPhrase = false;
|
|
/**
|
|
* Returns whether the reason phrase will be sent in the response.
|
|
* By default a reason phrase will not be sent in the response.
|
|
*
|
|
* @return whether the reason phrase will be sent
|
|
* @deprecated This option will be removed in Tomcat 9. Reason phrase will
|
|
* not be sent.
|
|
*/
|
|
@Deprecated
|
|
public boolean getSendReasonPhrase() {
|
|
return sendReasonPhrase;
|
|
}
|
|
/**
|
|
* Specifies whether the reason phrase will be sent in the response.
|
|
* By default a reason phrase will not be sent in the response.
|
|
*
|
|
* @param sendReasonPhrase specifies whether the reason phrase will be sent
|
|
* @deprecated This option will be removed in Tomcat 9. Reason phrase will
|
|
* not be sent.
|
|
*/
|
|
@Deprecated
|
|
public void setSendReasonPhrase(boolean sendReasonPhrase) {
|
|
this.sendReasonPhrase = sendReasonPhrase;
|
|
}
|
|
|
|
|
|
// ---------------------- Properties that are passed through to the EndPoint
|
|
|
|
@Override
|
|
public Executor getExecutor() { return endpoint.getExecutor(); }
|
|
public void setExecutor(Executor executor) {
|
|
endpoint.setExecutor(executor);
|
|
}
|
|
|
|
|
|
public int getMaxThreads() { return endpoint.getMaxThreads(); }
|
|
public void setMaxThreads(int maxThreads) {
|
|
endpoint.setMaxThreads(maxThreads);
|
|
}
|
|
|
|
public int getMaxConnections() { return endpoint.getMaxConnections(); }
|
|
public void setMaxConnections(int maxConnections) {
|
|
endpoint.setMaxConnections(maxConnections);
|
|
}
|
|
|
|
|
|
public int getMinSpareThreads() { return endpoint.getMinSpareThreads(); }
|
|
public void setMinSpareThreads(int minSpareThreads) {
|
|
endpoint.setMinSpareThreads(minSpareThreads);
|
|
}
|
|
|
|
|
|
public int getThreadPriority() { return endpoint.getThreadPriority(); }
|
|
public void setThreadPriority(int threadPriority) {
|
|
endpoint.setThreadPriority(threadPriority);
|
|
}
|
|
|
|
|
|
public int getAcceptCount() { return endpoint.getAcceptCount(); }
|
|
public void setAcceptCount(int acceptCount) { endpoint.setAcceptCount(acceptCount); }
|
|
@Deprecated
|
|
public int getBacklog() { return endpoint.getBacklog(); }
|
|
@Deprecated
|
|
public void setBacklog(int backlog) { endpoint.setBacklog(backlog); }
|
|
|
|
|
|
public boolean getTcpNoDelay() { return endpoint.getTcpNoDelay(); }
|
|
public void setTcpNoDelay(boolean tcpNoDelay) {
|
|
endpoint.setTcpNoDelay(tcpNoDelay);
|
|
}
|
|
|
|
|
|
public int getConnectionLinger() { return endpoint.getConnectionLinger(); }
|
|
public void setConnectionLinger(int connectionLinger) {
|
|
endpoint.setConnectionLinger(connectionLinger);
|
|
}
|
|
@Deprecated
|
|
public int getSoLinger() { return endpoint.getSoLinger(); }
|
|
@Deprecated
|
|
public void setSoLinger(int soLinger) { endpoint.setSoLinger(soLinger); }
|
|
|
|
|
|
public int getKeepAliveTimeout() { return endpoint.getKeepAliveTimeout(); }
|
|
public void setKeepAliveTimeout(int keepAliveTimeout) {
|
|
endpoint.setKeepAliveTimeout(keepAliveTimeout);
|
|
}
|
|
|
|
public InetAddress getAddress() { return endpoint.getAddress(); }
|
|
public void setAddress(InetAddress ia) {
|
|
endpoint.setAddress(ia);
|
|
}
|
|
|
|
|
|
public int getPort() { return endpoint.getPort(); }
|
|
public void setPort(int port) {
|
|
endpoint.setPort(port);
|
|
}
|
|
|
|
|
|
public int getLocalPort() { return endpoint.getLocalPort(); }
|
|
|
|
/*
|
|
* When Tomcat expects data from the client, this is the time Tomcat will
|
|
* wait for that data to arrive before closing the connection.
|
|
*/
|
|
public int getConnectionTimeout() {
|
|
return endpoint.getConnectionTimeout();
|
|
}
|
|
public void setConnectionTimeout(int timeout) {
|
|
endpoint.setConnectionTimeout(timeout);
|
|
}
|
|
|
|
@Deprecated
|
|
public int getSoTimeout() {
|
|
return getConnectionTimeout();
|
|
}
|
|
@Deprecated
|
|
public void setSoTimeout(int timeout) {
|
|
setConnectionTimeout(timeout);
|
|
}
|
|
|
|
public int getMaxHeaderCount() {
|
|
return endpoint.getMaxHeaderCount();
|
|
}
|
|
public void setMaxHeaderCount(int maxHeaderCount) {
|
|
endpoint.setMaxHeaderCount(maxHeaderCount);
|
|
}
|
|
|
|
public long getConnectionCount() {
|
|
return endpoint.getConnectionCount();
|
|
}
|
|
|
|
public void setAcceptorThreadCount(int threadCount) {
|
|
endpoint.setAcceptorThreadCount(threadCount);
|
|
}
|
|
public int getAcceptorThreadCount() {
|
|
return endpoint.getAcceptorThreadCount();
|
|
}
|
|
|
|
public void setAcceptorThreadPriority(int threadPriority) {
|
|
endpoint.setAcceptorThreadPriority(threadPriority);
|
|
}
|
|
public int getAcceptorThreadPriority() {
|
|
return endpoint.getAcceptorThreadPriority();
|
|
}
|
|
|
|
|
|
// ---------------------------------------------------------- Public methods
|
|
|
|
public synchronized int getNameIndex() {
|
|
if (nameIndex == 0) {
|
|
nameIndex = nameCounter.incrementAndGet();
|
|
}
|
|
|
|
return nameIndex;
|
|
}
|
|
|
|
|
|
/**
|
|
* The name will be prefix-address-port if address is non-null and
|
|
* prefix-port if the address is null.
|
|
*
|
|
* @return A name for this protocol instance that is appropriately quoted
|
|
* for use in an ObjectName.
|
|
*/
|
|
public String getName() {
|
|
return ObjectName.quote(getNameInternal());
|
|
}
|
|
|
|
|
|
private String getNameInternal() {
|
|
StringBuilder name = new StringBuilder(getNamePrefix());
|
|
name.append('-');
|
|
if (getAddress() != null) {
|
|
name.append(getAddress().getHostAddress());
|
|
name.append('-');
|
|
}
|
|
int port = getPort();
|
|
if (port == 0) {
|
|
// Auto binding is in use. Check if port is known
|
|
name.append("auto-");
|
|
name.append(getNameIndex());
|
|
port = getLocalPort();
|
|
if (port != -1) {
|
|
name.append('-');
|
|
name.append(port);
|
|
}
|
|
} else {
|
|
name.append(port);
|
|
}
|
|
return name.toString();
|
|
}
|
|
|
|
|
|
public void addWaitingProcessor(Processor processor) {
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractProcotol.waitingProcerssor.add", processor));
|
|
}
|
|
waitingProcessors.add(processor);
|
|
}
|
|
|
|
|
|
public void removeWaitingProcessor(Processor processor) {
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractProcotol.waitingProcerssor.remove", processor));
|
|
}
|
|
waitingProcessors.remove(processor);
|
|
}
|
|
|
|
|
|
// ----------------------------------------------- Accessors for sub-classes
|
|
|
|
protected AbstractEndpoint<S> getEndpoint() {
|
|
return endpoint;
|
|
}
|
|
|
|
|
|
protected Handler<S> getHandler() {
|
|
return handler;
|
|
}
|
|
|
|
protected void setHandler(Handler<S> handler) {
|
|
this.handler = handler;
|
|
}
|
|
|
|
|
|
// -------------------------------------------------------- Abstract methods
|
|
|
|
/**
|
|
* Concrete implementations need to provide access to their logger to be
|
|
* used by the abstract classes.
|
|
* @return the logger
|
|
*/
|
|
protected abstract Log getLog();
|
|
|
|
|
|
/**
|
|
* Obtain the prefix to be used when construction a name for this protocol
|
|
* handler. The name will be prefix-address-port.
|
|
* @return the prefix
|
|
*/
|
|
protected abstract String getNamePrefix();
|
|
|
|
|
|
/**
|
|
* Obtain the name of the protocol, (Http, Ajp, etc.). Used with JMX.
|
|
* @return the protocol name
|
|
*/
|
|
protected abstract String getProtocolName();
|
|
|
|
|
|
/**
|
|
* Find a suitable handler for the protocol negotiated
|
|
* at the network layer.
|
|
* @param name The name of the requested negotiated protocol.
|
|
* @return The instance where {@link UpgradeProtocol#getAlpnName()} matches
|
|
* the requested protocol
|
|
*/
|
|
protected abstract UpgradeProtocol getNegotiatedProtocol(String name);
|
|
|
|
|
|
/**
|
|
* Find a suitable handler for the protocol upgraded name specified. This
|
|
* is used for direct connection protocol selection.
|
|
* @param name The name of the requested negotiated protocol.
|
|
* @return The instance where {@link UpgradeProtocol#getAlpnName()} matches
|
|
* the requested protocol
|
|
*/
|
|
protected abstract UpgradeProtocol getUpgradeProtocol(String name);
|
|
|
|
|
|
/**
|
|
* Create and configure a new Processor instance for the current protocol
|
|
* implementation.
|
|
*
|
|
* @return A fully configured Processor instance that is ready to use
|
|
*/
|
|
protected abstract Processor createProcessor();
|
|
|
|
|
|
protected abstract Processor createUpgradeProcessor(
|
|
SocketWrapperBase<?> socket,
|
|
UpgradeToken upgradeToken);
|
|
|
|
|
|
// ----------------------------------------------------- JMX related methods
|
|
|
|
protected String domain;
|
|
protected ObjectName oname;
|
|
protected MBeanServer mserver;
|
|
|
|
public ObjectName getObjectName() {
|
|
return oname;
|
|
}
|
|
|
|
public String getDomain() {
|
|
return domain;
|
|
}
|
|
|
|
@Override
|
|
public ObjectName preRegister(MBeanServer server, ObjectName name)
|
|
throws Exception {
|
|
oname = name;
|
|
mserver = server;
|
|
domain = name.getDomain();
|
|
return name;
|
|
}
|
|
|
|
@Override
|
|
public void postRegister(Boolean registrationDone) {
|
|
// NOOP
|
|
}
|
|
|
|
@Override
|
|
public void preDeregister() throws Exception {
|
|
// NOOP
|
|
}
|
|
|
|
@Override
|
|
public void postDeregister() {
|
|
// NOOP
|
|
}
|
|
|
|
private ObjectName createObjectName() throws MalformedObjectNameException {
|
|
// Use the same domain as the connector
|
|
domain = getAdapter().getDomain();
|
|
|
|
if (domain == null) {
|
|
return null;
|
|
}
|
|
|
|
StringBuilder name = new StringBuilder(getDomain());
|
|
name.append(":type=ProtocolHandler,port=");
|
|
int port = getPort();
|
|
if (port > 0) {
|
|
name.append(getPort());
|
|
} else {
|
|
name.append("auto-");
|
|
name.append(getNameIndex());
|
|
}
|
|
InetAddress address = getAddress();
|
|
if (address != null) {
|
|
name.append(",address=");
|
|
name.append(ObjectName.quote(address.getHostAddress()));
|
|
}
|
|
return new ObjectName(name.toString());
|
|
}
|
|
|
|
|
|
// ------------------------------------------------------- Lifecycle methods
|
|
|
|
/*
|
|
* NOTE: There is no maintenance of state or checking for valid transitions
|
|
* within this class. It is expected that the connector will maintain state
|
|
* and prevent invalid state transitions.
|
|
*/
|
|
|
|
@Override
|
|
public void init() throws Exception {
|
|
if (getLog().isInfoEnabled()) {
|
|
getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
|
|
}
|
|
|
|
if (oname == null) {
|
|
// Component not pre-registered so register it
|
|
oname = createObjectName();
|
|
if (oname != null) {
|
|
Registry.getRegistry(null, null).registerComponent(this, oname, null);
|
|
}
|
|
}
|
|
|
|
if (this.domain != null) {
|
|
rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
|
|
Registry.getRegistry(null, null).registerComponent(
|
|
getHandler().getGlobal(), rgOname, null);
|
|
}
|
|
|
|
String endpointName = getName();
|
|
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
|
|
endpoint.setDomain(domain);
|
|
|
|
endpoint.init();
|
|
}
|
|
|
|
|
|
@Override
|
|
public void start() throws Exception {
|
|
if (getLog().isInfoEnabled()) {
|
|
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
|
|
}
|
|
|
|
endpoint.start();
|
|
|
|
// Start timeout thread
|
|
asyncTimeout = new AsyncTimeout();
|
|
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
|
|
int priority = endpoint.getThreadPriority();
|
|
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
|
|
priority = Thread.NORM_PRIORITY;
|
|
}
|
|
timeoutThread.setPriority(priority);
|
|
timeoutThread.setDaemon(true);
|
|
timeoutThread.start();
|
|
}
|
|
|
|
|
|
@Override
|
|
public void pause() throws Exception {
|
|
if (getLog().isInfoEnabled()) {
|
|
getLog().info(sm.getString("abstractProtocolHandler.pause", getName()));
|
|
}
|
|
|
|
endpoint.pause();
|
|
}
|
|
|
|
|
|
@Override
|
|
public void resume() throws Exception {
|
|
if(getLog().isInfoEnabled()) {
|
|
getLog().info(sm.getString("abstractProtocolHandler.resume", getName()));
|
|
}
|
|
|
|
endpoint.resume();
|
|
}
|
|
|
|
|
|
@Override
|
|
public void stop() throws Exception {
|
|
if(getLog().isInfoEnabled()) {
|
|
getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
|
|
}
|
|
|
|
if (asyncTimeout != null) {
|
|
asyncTimeout.stop();
|
|
}
|
|
|
|
endpoint.stop();
|
|
}
|
|
|
|
|
|
@Override
|
|
public void destroy() throws Exception {
|
|
if(getLog().isInfoEnabled()) {
|
|
getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
|
|
}
|
|
|
|
try {
|
|
endpoint.destroy();
|
|
} finally {
|
|
if (oname != null) {
|
|
if (mserver == null) {
|
|
Registry.getRegistry(null, null).unregisterComponent(oname);
|
|
} else {
|
|
// Possibly registered with a different MBeanServer
|
|
try {
|
|
mserver.unregisterMBean(oname);
|
|
} catch (MBeanRegistrationException | InstanceNotFoundException e) {
|
|
getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
|
|
oname, mserver));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (rgOname != null) {
|
|
Registry.getRegistry(null, null).unregisterComponent(rgOname);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
public void closeServerSocketGraceful() {
|
|
endpoint.closeServerSocketGraceful();
|
|
}
|
|
|
|
|
|
// ------------------------------------------- Connection handler base class
|
|
|
|
protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {
|
|
|
|
private final AbstractProtocol<S> proto;
|
|
private final RequestGroupInfo global = new RequestGroupInfo();
|
|
private final AtomicLong registerCount = new AtomicLong(0);
|
|
private final Map<S,Processor> connections = new ConcurrentHashMap<>();
|
|
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
|
|
|
|
public ConnectionHandler(AbstractProtocol<S> proto) {
|
|
this.proto = proto;
|
|
}
|
|
|
|
protected AbstractProtocol<S> getProtocol() {
|
|
return proto;
|
|
}
|
|
|
|
protected Log getLog() {
|
|
return getProtocol().getLog();
|
|
}
|
|
|
|
@Override
|
|
public Object getGlobal() {
|
|
return global;
|
|
}
|
|
|
|
@Override
|
|
public void recycle() {
|
|
recycledProcessors.clear();
|
|
}
|
|
|
|
|
|
@Override
|
|
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractConnectionHandler.process",
|
|
wrapper.getSocket(), status));
|
|
}
|
|
if (wrapper == null) {
|
|
// Nothing to do. Socket has been closed.
|
|
return SocketState.CLOSED;
|
|
}
|
|
|
|
S socket = wrapper.getSocket();
|
|
|
|
Processor processor = connections.get(socket);
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
|
|
processor, socket));
|
|
}
|
|
|
|
// Timeouts are calculated on a dedicated thread and then
|
|
// dispatched. Because of delays in the dispatch process, the
|
|
// timeout may no longer be required. Check here and avoid
|
|
// unnecessary processing.
|
|
if (SocketEvent.TIMEOUT == status &&
|
|
(processor == null ||
|
|
!processor.isAsync() && !processor.isUpgrade() ||
|
|
processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) {
|
|
// This is effectively a NO-OP
|
|
return SocketState.OPEN;
|
|
}
|
|
|
|
if (processor != null) {
|
|
// Make sure an async timeout doesn't fire
|
|
getProtocol().removeWaitingProcessor(processor);
|
|
} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
|
|
// Nothing to do. Endpoint requested a close and there is no
|
|
// longer a processor associated with this socket.
|
|
return SocketState.CLOSED;
|
|
}
|
|
|
|
ContainerThreadMarker.set();
|
|
|
|
try {
|
|
if (processor == null) {
|
|
String negotiatedProtocol = wrapper.getNegotiatedProtocol();
|
|
// OpenSSL typically returns null whereas JSSE typically
|
|
// returns "" when no protocol is negotiated
|
|
if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
|
|
UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol);
|
|
if (upgradeProtocol != null) {
|
|
processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
|
|
}
|
|
} else if (negotiatedProtocol.equals("http/1.1")) {
|
|
// Explicitly negotiated the default protocol.
|
|
// Obtain a processor below.
|
|
} else {
|
|
// TODO:
|
|
// OpenSSL 1.0.2's ALPN callback doesn't support
|
|
// failing the handshake with an error if no
|
|
// protocol can be negotiated. Therefore, we need to
|
|
// fail the connection here. Once this is fixed,
|
|
// replace the code below with the commented out
|
|
// block.
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",
|
|
negotiatedProtocol));
|
|
}
|
|
return SocketState.CLOSED;
|
|
/*
|
|
* To replace the code above once OpenSSL 1.1.0 is
|
|
* used.
|
|
// Failed to create processor. This is a bug.
|
|
throw new IllegalStateException(sm.getString(
|
|
"abstractConnectionHandler.negotiatedProcessor.fail",
|
|
negotiatedProtocol));
|
|
*/
|
|
}
|
|
}
|
|
}
|
|
if (processor == null) {
|
|
processor = recycledProcessors.pop();
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor));
|
|
}
|
|
}
|
|
if (processor == null) {
|
|
processor = getProtocol().createProcessor();
|
|
register(processor);
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
|
|
}
|
|
}
|
|
|
|
processor.setSslSupport(
|
|
wrapper.getSslSupport(getProtocol().getClientCertProvider()));
|
|
|
|
// Associate the processor with the connection
|
|
connections.put(socket, processor);
|
|
|
|
SocketState state = SocketState.CLOSED;
|
|
do {
|
|
state = processor.process(wrapper, status);
|
|
|
|
if (state == SocketState.UPGRADING) {
|
|
// Get the HTTP upgrade handler
|
|
UpgradeToken upgradeToken = processor.getUpgradeToken();
|
|
// Retrieve leftover input
|
|
ByteBuffer leftOverInput = processor.getLeftoverInput();
|
|
if (upgradeToken == null) {
|
|
// Assume direct HTTP/2 connection
|
|
UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
|
|
if (upgradeProtocol != null) {
|
|
processor = upgradeProtocol.getProcessor(
|
|
wrapper, getProtocol().getAdapter());
|
|
wrapper.unRead(leftOverInput);
|
|
// Associate with the processor with the connection
|
|
connections.put(socket, processor);
|
|
} else {
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString(
|
|
"abstractConnectionHandler.negotiatedProcessor.fail",
|
|
"h2c"));
|
|
}
|
|
return SocketState.CLOSED;
|
|
}
|
|
} else {
|
|
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
|
|
// Release the Http11 processor to be re-used
|
|
release(processor);
|
|
// Create the upgrade processor
|
|
processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
|
|
processor, wrapper));
|
|
}
|
|
wrapper.unRead(leftOverInput);
|
|
// Mark the connection as upgraded
|
|
wrapper.setUpgraded(true);
|
|
// Associate with the processor with the connection
|
|
connections.put(socket, processor);
|
|
// Initialise the upgrade handler (which may trigger
|
|
// some IO using the new protocol which is why the lines
|
|
// above are necessary)
|
|
// This cast should be safe. If it fails the error
|
|
// handling for the surrounding try/catch will deal with
|
|
// it.
|
|
if (upgradeToken.getInstanceManager() == null) {
|
|
httpUpgradeHandler.init((WebConnection) processor);
|
|
} else {
|
|
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
|
|
try {
|
|
httpUpgradeHandler.init((WebConnection) processor);
|
|
} finally {
|
|
upgradeToken.getContextBind().unbind(false, oldCL);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} while ( state == SocketState.UPGRADING);
|
|
|
|
if (state == SocketState.LONG) {
|
|
// In the middle of processing a request/response. Keep the
|
|
// socket associated with the processor. Exact requirements
|
|
// depend on type of long poll
|
|
longPoll(wrapper, processor);
|
|
if (processor.isAsync()) {
|
|
getProtocol().addWaitingProcessor(processor);
|
|
}
|
|
} else if (state == SocketState.OPEN) {
|
|
// In keep-alive but between requests. OK to recycle
|
|
// processor. Continue to poll for the next request.
|
|
connections.remove(socket);
|
|
release(processor);
|
|
wrapper.registerReadInterest();
|
|
} else if (state == SocketState.SENDFILE) {
|
|
// Sendfile in progress. If it fails, the socket will be
|
|
// closed. If it works, the socket either be added to the
|
|
// poller (or equivalent) to await more data or processed
|
|
// if there are any pipe-lined requests remaining.
|
|
} else if (state == SocketState.UPGRADED) {
|
|
// Don't add sockets back to the poller if this was a
|
|
// non-blocking write otherwise the poller may trigger
|
|
// multiple read events which may lead to thread starvation
|
|
// in the connector. The write() method will add this socket
|
|
// to the poller if necessary.
|
|
if (status != SocketEvent.OPEN_WRITE) {
|
|
longPoll(wrapper, processor);
|
|
getProtocol().addWaitingProcessor(processor);
|
|
}
|
|
} else if (state == SocketState.SUSPENDED) {
|
|
// Don't add sockets back to the poller.
|
|
// The resumeProcessing() method will add this socket
|
|
// to the poller.
|
|
} else {
|
|
// Connection closed. OK to recycle the processor.
|
|
// Processors handling upgrades require additional clean-up
|
|
// before release.
|
|
connections.remove(socket);
|
|
if (processor.isUpgrade()) {
|
|
UpgradeToken upgradeToken = processor.getUpgradeToken();
|
|
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
|
|
InstanceManager instanceManager = upgradeToken.getInstanceManager();
|
|
if (instanceManager == null) {
|
|
httpUpgradeHandler.destroy();
|
|
} else {
|
|
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
|
|
try {
|
|
httpUpgradeHandler.destroy();
|
|
} finally {
|
|
try {
|
|
instanceManager.destroyInstance(httpUpgradeHandler);
|
|
} catch (Throwable e) {
|
|
ExceptionUtils.handleThrowable(e);
|
|
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
|
|
}
|
|
upgradeToken.getContextBind().unbind(false, oldCL);
|
|
}
|
|
}
|
|
}
|
|
release(processor);
|
|
}
|
|
return state;
|
|
} catch(java.net.SocketException e) {
|
|
// SocketExceptions are normal
|
|
getLog().debug(sm.getString(
|
|
"abstractConnectionHandler.socketexception.debug"), e);
|
|
} catch (java.io.IOException e) {
|
|
// IOExceptions are normal
|
|
getLog().debug(sm.getString(
|
|
"abstractConnectionHandler.ioexception.debug"), e);
|
|
} catch (ProtocolException e) {
|
|
// Protocol exceptions normally mean the client sent invalid or
|
|
// incomplete data.
|
|
getLog().debug(sm.getString(
|
|
"abstractConnectionHandler.protocolexception.debug"), e);
|
|
}
|
|
// Future developers: if you discover any other
|
|
// rare-but-nonfatal exceptions, catch them here, and log as
|
|
// above.
|
|
catch (OutOfMemoryError oome) {
|
|
// Try and handle this here to give Tomcat a chance to close the
|
|
// connection and prevent clients waiting until they time out.
|
|
// Worst case, it isn't recoverable and the attempt at logging
|
|
// will trigger another OOME.
|
|
getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);
|
|
} catch (Throwable e) {
|
|
ExceptionUtils.handleThrowable(e);
|
|
// any other exception or error is odd. Here we log it
|
|
// with "ERROR" level, so it will show up even on
|
|
// less-than-verbose logs.
|
|
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
|
|
} finally {
|
|
ContainerThreadMarker.clear();
|
|
}
|
|
|
|
// Make sure socket/processor is removed from the list of current
|
|
// connections
|
|
connections.remove(socket);
|
|
release(processor);
|
|
return SocketState.CLOSED;
|
|
}
|
|
|
|
|
|
protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
|
|
if (!processor.isAsync()) {
|
|
// This is currently only used with HTTP
|
|
// Either:
|
|
// - this is an upgraded connection
|
|
// - the request line/headers have not been completely
|
|
// read
|
|
socket.registerReadInterest();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
public Set<S> getOpenSockets() {
|
|
return connections.keySet();
|
|
}
|
|
|
|
|
|
/**
|
|
* Expected to be used by the handler once the processor is no longer
|
|
* required.
|
|
*
|
|
* @param processor Processor being released (that was associated with
|
|
* the socket)
|
|
*/
|
|
private void release(Processor processor) {
|
|
if (processor != null) {
|
|
processor.recycle();
|
|
// After recycling, only instances of UpgradeProcessorBase will
|
|
// return true for isUpgrade().
|
|
// Instances of UpgradeProcessorBase should not be added to
|
|
// recycledProcessors since that pool is only for AJP or HTTP
|
|
// processors
|
|
if (!processor.isUpgrade()) {
|
|
recycledProcessors.push(processor);
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug("Pushed Processor [" + processor + "]");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Expected to be used by the Endpoint to release resources on socket
|
|
* close, errors etc.
|
|
*/
|
|
@Override
|
|
public void release(SocketWrapperBase<S> socketWrapper) {
|
|
S socket = socketWrapper.getSocket();
|
|
Processor processor = connections.remove(socket);
|
|
release(processor);
|
|
}
|
|
|
|
|
|
protected void register(Processor processor) {
|
|
if (getProtocol().getDomain() != null) {
|
|
synchronized (this) {
|
|
try {
|
|
long count = registerCount.incrementAndGet();
|
|
RequestInfo rp =
|
|
processor.getRequest().getRequestProcessor();
|
|
rp.setGlobalProcessor(global);
|
|
ObjectName rpName = new ObjectName(
|
|
getProtocol().getDomain() +
|
|
":type=RequestProcessor,worker="
|
|
+ getProtocol().getName() +
|
|
",name=" + getProtocol().getProtocolName() +
|
|
"Request" + count);
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug("Register [" + processor + "] as [" + rpName + "]");
|
|
}
|
|
Registry.getRegistry(null, null).registerComponent(rp,
|
|
rpName, null);
|
|
rp.setRpName(rpName);
|
|
} catch (Exception e) {
|
|
getLog().warn(sm.getString("abstractProtocol.processorRegisterError"), e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
protected void unregister(Processor processor) {
|
|
if (getProtocol().getDomain() != null) {
|
|
synchronized (this) {
|
|
try {
|
|
Request r = processor.getRequest();
|
|
if (r == null) {
|
|
// Probably an UpgradeProcessor
|
|
return;
|
|
}
|
|
RequestInfo rp = r.getRequestProcessor();
|
|
rp.setGlobalProcessor(null);
|
|
ObjectName rpName = rp.getRpName();
|
|
if (getLog().isDebugEnabled()) {
|
|
getLog().debug("Unregister [" + rpName + "]");
|
|
}
|
|
Registry.getRegistry(null, null).unregisterComponent(
|
|
rpName);
|
|
rp.setRpName(null);
|
|
} catch (Exception e) {
|
|
getLog().warn(sm.getString("abstractProtocol.processorUnregisterError"), e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final void pause() {
|
|
/*
|
|
* Inform all the processors associated with current connections
|
|
* that the endpoint is being paused. Most won't care. Those
|
|
* processing multiplexed streams may wish to take action. For
|
|
* example, HTTP/2 may wish to stop accepting new streams.
|
|
*
|
|
* Note that even if the endpoint is resumed, there is (currently)
|
|
* no API to inform the Processors of this.
|
|
*/
|
|
for (Processor processor : connections.values()) {
|
|
processor.pause();
|
|
}
|
|
}
|
|
}
|
|
|
|
protected static class RecycledProcessors extends SynchronizedStack<Processor> {
|
|
|
|
private final transient ConnectionHandler<?> handler;
|
|
protected final AtomicInteger size = new AtomicInteger(0);
|
|
|
|
public RecycledProcessors(ConnectionHandler<?> handler) {
|
|
this.handler = handler;
|
|
}
|
|
|
|
@SuppressWarnings("sync-override") // Size may exceed cache size a bit
|
|
@Override
|
|
public boolean push(Processor processor) {
|
|
int cacheSize = handler.getProtocol().getProcessorCache();
|
|
boolean offer = cacheSize == -1 ? true : size.get() < cacheSize;
|
|
//avoid over growing our cache or add after we have stopped
|
|
boolean result = false;
|
|
if (offer) {
|
|
result = super.push(processor);
|
|
if (result) {
|
|
size.incrementAndGet();
|
|
}
|
|
}
|
|
if (!result) handler.unregister(processor);
|
|
return result;
|
|
}
|
|
|
|
@SuppressWarnings("sync-override") // OK if size is too big briefly
|
|
@Override
|
|
public Processor pop() {
|
|
Processor result = super.pop();
|
|
if (result != null) {
|
|
size.decrementAndGet();
|
|
}
|
|
return result;
|
|
}
|
|
|
|
@Override
|
|
public synchronized void clear() {
|
|
Processor next = pop();
|
|
while (next != null) {
|
|
handler.unregister(next);
|
|
next = pop();
|
|
}
|
|
super.clear();
|
|
size.set(0);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Async timeout thread
|
|
*/
|
|
protected class AsyncTimeout implements Runnable {
|
|
|
|
private volatile boolean asyncTimeoutRunning = true;
|
|
|
|
/**
|
|
* The background thread that checks async requests and fires the
|
|
* timeout if there has been no activity.
|
|
*/
|
|
@Override
|
|
public void run() {
|
|
|
|
// Loop until we receive a shutdown command
|
|
while (asyncTimeoutRunning) {
|
|
try {
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException e) {
|
|
// Ignore
|
|
}
|
|
long now = System.currentTimeMillis();
|
|
for (Processor processor : waitingProcessors) {
|
|
processor.timeoutAsync(now);
|
|
}
|
|
|
|
// Loop if endpoint is paused
|
|
while (endpoint.isPaused() && asyncTimeoutRunning) {
|
|
try {
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException e) {
|
|
// Ignore
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
protected void stop() {
|
|
asyncTimeoutRunning = false;
|
|
|
|
// Timeout any pending async request
|
|
for (Processor processor : waitingProcessors) {
|
|
processor.timeoutAsync(-1);
|
|
}
|
|
}
|
|
}
|
|
}
|