/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.HandlingTimeTracker;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ForkingResponseHandlerRunnable;
import org.elasticsearch.transport.Header;
import org.elasticsearch.transport.InboundMessage;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.ResponseStatsConsumer;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransportChannel;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportHandshaker;
import org.elasticsearch.transport.TransportKeepAlive;
import org.elasticsearch.transport.TransportLogger;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;

public class InboundHandler {
    private static final Logger logger = LogManager.getLogger(InboundHandler.class);
    private final ThreadPool threadPool;
    private final OutboundHandler outboundHandler;
    private final NamedWriteableRegistry namedWriteableRegistry;
    private final TransportHandshaker handshaker;
    private final TransportKeepAlive keepAlive;
    private final Transport.ResponseHandlers responseHandlers;
    private final Transport.RequestHandlers requestHandlers;
    private final HandlingTimeTracker handlingTimeTracker;
    private final boolean ignoreDeserializationErrors;
    private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
    private volatile long slowLogThresholdMs = Long.MAX_VALUE;
    private static final StreamInput EMPTY_STREAM_INPUT = new ByteBufferStreamInput(ByteBuffer.wrap(BytesRef.EMPTY_BYTES));

    InboundHandler(ThreadPool threadPool, OutboundHandler outboundHandler, NamedWriteableRegistry namedWriteableRegistry, TransportHandshaker handshaker, TransportKeepAlive keepAlive, Transport.RequestHandlers requestHandlers, Transport.ResponseHandlers responseHandlers, HandlingTimeTracker handlingTimeTracker, boolean ignoreDeserializationErrors) {
        this.threadPool = threadPool;
        this.outboundHandler = outboundHandler;
        this.namedWriteableRegistry = namedWriteableRegistry;
        this.handshaker = handshaker;
        this.keepAlive = keepAlive;
        this.requestHandlers = requestHandlers;
        this.responseHandlers = responseHandlers;
        this.handlingTimeTracker = handlingTimeTracker;
        this.ignoreDeserializationErrors = ignoreDeserializationErrors;
    }

    void setMessageListener(TransportMessageListener listener) {
        if (this.messageListener != TransportMessageListener.NOOP_LISTENER) {
            throw new IllegalStateException("Cannot set message listener twice");
        }
        this.messageListener = listener;
    }

    void setSlowLogThreshold(TimeValue slowLogThreshold) {
        this.slowLogThresholdMs = slowLogThreshold.getMillis();
    }

    void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception {
        long startTime = this.threadPool.rawRelativeTimeInMillis();
        channel.getChannelStats().markAccessed(startTime);
        TransportLogger.logInboundMessage(channel, message);
        if (message.isPing()) {
            this.keepAlive.receiveKeepAlive(channel);
        } else {
            this.messageReceived(channel, message, startTime);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void messageReceived(TcpChannel channel, InboundMessage message, long startTime) throws IOException {
        InetSocketAddress remoteAddress = channel.getRemoteAddress();
        Header header = message.getHeader();
        assert (!header.needsToReadVariableHeader());
        TransportResponseHandler<?> responseHandler = null;
        ThreadContext threadContext = this.threadPool.getThreadContext();
        try (ThreadContext.StoredContext existing = threadContext.stashContext();){
            threadContext.setHeaders(header.getHeaders());
            threadContext.putTransient("_remote_address", remoteAddress);
            if (header.isRequest()) {
                this.handleRequest(channel, message);
            } else {
                assert (!message.isShortCircuit());
                responseHandler = this.findResponseHandler(header);
                if (responseHandler != null) {
                    this.executeResponseHandler(message, responseHandler, remoteAddress);
                }
            }
        }
        finally {
            long took = this.threadPool.rawRelativeTimeInMillis() - startTime;
            this.handlingTimeTracker.addHandlingTime(took);
            long logThreshold = this.slowLogThresholdMs;
            if (logThreshold > 0L && took > logThreshold) {
                InboundHandler.logSlowMessage(message, took, logThreshold, responseHandler);
            }
        }
    }

    private void executeResponseHandler(InboundMessage message, TransportResponseHandler<?> responseHandler, InetSocketAddress remoteAddress) throws IOException {
        Header header = message.getHeader();
        if (message.getContentLength() > 0 || !header.getVersion().equals(TransportVersion.current())) {
            StreamInput streamInput = this.namedWriteableStream(message.openOrGetStreamInput());
            assert (InboundHandler.assertRemoteVersion(streamInput, header.getVersion()));
            if (header.isError()) {
                this.handlerResponseError(streamInput, message, responseHandler);
            } else {
                this.handleResponse(remoteAddress, streamInput, responseHandler, message);
            }
        } else {
            assert (!header.isError());
            this.handleResponse(remoteAddress, EMPTY_STREAM_INPUT, responseHandler, message);
        }
    }

    private TransportResponseHandler<?> findResponseHandler(Header header) {
        if (header.isHandshake()) {
            return this.handshaker.removeHandlerForHandshake(header.getRequestId());
        }
        TransportResponseHandler<? extends TransportResponse> theHandler = this.responseHandlers.onResponseReceived(header.getRequestId(), this.messageListener);
        if (theHandler == null && header.isError()) {
            return this.handshaker.removeHandlerForHandshake(header.getRequestId());
        }
        return theHandler;
    }

    private static void logSlowMessage(InboundMessage message, long took, long logThreshold, TransportResponseHandler<?> responseHandler) {
        if (message.getHeader().isRequest()) {
            logger.warn("handling request [{}] took [{}ms] which is above the warn threshold of [{}ms]; for more information, see {}", (Object)message, (Object)took, (Object)logThreshold, (Object)ReferenceDocs.NETWORK_THREADING_MODEL);
        } else {
            logger.warn("handling response [{}] on handler [{}] took [{}ms] which is above the warn threshold of [{}ms]; for more information, see {}", (Object)message, responseHandler, (Object)took, (Object)logThreshold, (Object)ReferenceDocs.NETWORK_THREADING_MODEL);
        }
    }

    private void verifyRequestReadFully(StreamInput stream, long requestId, String action) throws IOException {
        int nextByte = stream.read();
        if (nextByte != -1) {
            IllegalStateException exception = new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action + "], available [" + stream.available() + "]; resetting");
            assert (this.ignoreDeserializationErrors) : exception;
            throw exception;
        }
    }

    private void verifyResponseReadFully(Header header, TransportResponseHandler<?> responseHandler, StreamInput streamInput) throws IOException {
        int nextByte = streamInput.read();
        if (nextByte != -1) {
            IllegalStateException exception = new IllegalStateException("Message not fully read (response) for requestId [" + header.getRequestId() + "], handler [" + responseHandler + "], error [" + header.isError() + "]; resetting");
            assert (this.ignoreDeserializationErrors) : exception;
            throw exception;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T extends TransportRequest> void handleRequest(TcpChannel channel, InboundMessage message) throws IOException {
        block22: {
            Header header = message.getHeader();
            if (header.isHandshake()) {
                this.handleHandshakeRequest(channel, message);
                return;
            }
            String action = header.getActionName();
            long requestId = header.getRequestId();
            RequestHandlerRegistry reg = this.requestHandlers.getHandler(action);
            assert (message.isShortCircuit() || reg != null) : action;
            TcpTransportChannel transportChannel = new TcpTransportChannel(this.outboundHandler, channel, action, requestId, header.getVersion(), header.getCompressionScheme(), reg == null ? ResponseStatsConsumer.NONE : reg, false, Releasables.assertOnce(message.takeBreakerReleaseControl()));
            try {
                Object request;
                this.messageListener.onRequestReceived(requestId, action);
                if (reg != null) {
                    reg.addRequestStats(header.getNetworkMessageSize() + 6);
                }
                if (message.isShortCircuit()) {
                    InboundHandler.sendErrorResponse(action, transportChannel, message.getException());
                    return;
                }
                assert (reg != null);
                StreamInput stream = this.namedWriteableStream(message.openOrGetStreamInput());
                assert (InboundHandler.assertRemoteVersion(stream, header.getVersion()));
                try {
                    request = reg.newRequest(stream);
                }
                catch (Exception e) {
                    assert (this.ignoreDeserializationErrors) : e;
                    throw e;
                }
                try {
                    ((TransportMessage)request).remoteAddress(channel.getRemoteAddress());
                    assert (requestId > 0L);
                    ((TransportRequest)request).setRequestId(requestId);
                    this.verifyRequestReadFully(stream, requestId, action);
                    if (reg.getExecutor() == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
                        try (ThreadContext.StoredContext ignored = this.threadPool.getThreadContext().newTraceContext();){
                            InboundHandler.doHandleRequest(reg, request, transportChannel);
                            break block22;
                        }
                    }
                    this.handleRequestForking(request, reg, transportChannel);
                }
                finally {
                    ((TransportMessage)request).decRef();
                }
            }
            catch (Exception e) {
                InboundHandler.sendErrorResponse(action, transportChannel, e);
            }
        }
    }

    private static <T extends TransportRequest> void doHandleRequest(RequestHandlerRegistry<T> reg, T request, TransportChannel channel) {
        try {
            reg.processMessageReceived(request, channel);
        }
        catch (Exception e) {
            InboundHandler.sendErrorResponse(reg.getAction(), channel, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T extends TransportRequest> void handleRequestForking(final T request, final RequestHandlerRegistry<T> reg, final TransportChannel channel) {
        boolean success = false;
        request.mustIncRef();
        try {
            reg.getExecutor().execute(this.threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable(){

                @Override
                protected void doRun() {
                    InboundHandler.doHandleRequest(reg, request, channel);
                }

                @Override
                public boolean isForceExecution() {
                    return reg.isForceExecution();
                }

                @Override
                public void onRejection(Exception e) {
                    InboundHandler.sendErrorResponse(reg.getAction(), channel, e);
                }

                @Override
                public void onFailure(Exception e) {
                    assert (false) : e;
                    InboundHandler.sendErrorResponse(reg.getAction(), channel, e);
                }

                @Override
                public void onAfter() {
                    request.decRef();
                }
            }));
            success = true;
        }
        finally {
            if (!success) {
                request.decRef();
            }
        }
    }

    private void handleHandshakeRequest(TcpChannel channel, InboundMessage message) throws IOException {
        Header header = message.getHeader();
        assert (header.actionName.equals("internal:tcp/handshake"));
        long requestId = header.getRequestId();
        this.messageListener.onRequestReceived(requestId, "internal:tcp/handshake");
        assert (!message.isShortCircuit());
        StreamInput stream = this.namedWriteableStream(message.openOrGetStreamInput());
        assert (InboundHandler.assertRemoteVersion(stream, header.getVersion()));
        TcpTransportChannel transportChannel = new TcpTransportChannel(this.outboundHandler, channel, "internal:tcp/handshake", requestId, header.getVersion(), header.getCompressionScheme(), ResponseStatsConsumer.NONE, true, Releasables.assertOnce(message.takeBreakerReleaseControl()));
        try {
            this.handshaker.handleHandshake(transportChannel, requestId, stream);
        }
        catch (Exception e) {
            logger.warn(() -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel", (Throwable)e);
            channel.close();
        }
    }

    private static void sendErrorResponse(String actionName, TransportChannel transportChannel, Exception e) {
        try {
            transportChannel.sendResponse(e);
        }
        catch (Exception inner) {
            inner.addSuppressed(e);
            logger.warn(() -> "Failed to send error message back to client for action [" + actionName + "]", (Throwable)inner);
        }
    }

    private <T extends TransportResponse> void handleResponse(final InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler<T> handler, final InboundMessage inboundMessage) {
        Executor executor = handler.executor();
        if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
            this.doHandleResponse(handler, remoteAddress, stream, inboundMessage.getHeader(), () -> {});
        } else {
            inboundMessage.mustIncRef();
            final Releasable releaseBuffer = Releasables.releaseOnce(inboundMessage::decRef);
            executor.execute(new ForkingResponseHandlerRunnable(handler, null){

                @Override
                protected void doRun() {
                    InboundHandler.this.doHandleResponse(handler, remoteAddress, stream, inboundMessage.getHeader(), releaseBuffer);
                }

                @Override
                public void onAfter() {
                    Releasables.closeExpectNoException(releaseBuffer);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T extends TransportResponse> void doHandleResponse(TransportResponseHandler<T> handler, InetSocketAddress remoteAddress, StreamInput stream, Header header, Releasable releaseResponseBuffer) {
        TransportResponse response;
        try (Releasable releasable = releaseResponseBuffer;){
            response = (TransportResponse)handler.read(stream);
            response.remoteAddress(remoteAddress);
        }
        catch (Exception e) {
            TransportSerializationException serializationException = new TransportSerializationException("Failed to deserialize response from handler [" + handler + "]", e);
            logger.warn(() -> "Failed to deserialize response from [" + remoteAddress + "]", (Throwable)serializationException);
            assert (this.ignoreDeserializationErrors) : e;
            InboundHandler.doHandleException(handler, serializationException);
            return;
        }
        try {
            this.verifyResponseReadFully(header, handler, stream);
            handler.handleResponse(response);
        }
        catch (Exception e) {
            InboundHandler.doHandleException(handler, new ResponseHandlerFailureTransportException(e));
        }
        finally {
            response.decRef();
        }
    }

    private void handlerResponseError(StreamInput stream, InboundMessage message, TransportResponseHandler<?> handler) {
        RemoteTransportException rtx;
        Object error;
        block2: {
            try {
                error = stream.readException();
                this.verifyResponseReadFully(message.getHeader(), handler, stream);
            }
            catch (Exception e) {
                error = new TransportSerializationException("Failed to deserialize exception response from stream for handler [" + handler + "]", e);
                if ($assertionsDisabled || this.ignoreDeserializationErrors) break block2;
                throw new AssertionError(error);
            }
        }
        this.handleException(handler, error instanceof RemoteTransportException ? (rtx = (RemoteTransportException)error) : new RemoteTransportException(((Throwable)error).getMessage(), (Throwable)error));
    }

    private void handleException(final TransportResponseHandler<?> handler, final TransportException transportException) {
        Executor executor = handler.executor();
        if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
            InboundHandler.doHandleException(handler, transportException);
        } else {
            executor.execute(new ForkingResponseHandlerRunnable(handler, transportException){

                @Override
                protected void doRun() {
                    InboundHandler.doHandleException(handler, transportException);
                }
            });
        }
    }

    private static void doHandleException(TransportResponseHandler<?> handler, TransportException transportException) {
        try {
            handler.handleException(transportException);
        }
        catch (Exception e) {
            transportException.addSuppressed(e);
            logger.error(() -> "failed to handle exception response [" + handler + "]", (Throwable)transportException);
        }
    }

    private StreamInput namedWriteableStream(StreamInput delegate) {
        return new NamedWriteableAwareStreamInput(delegate, this.namedWriteableRegistry);
    }

    static boolean assertRemoteVersion(StreamInput in, TransportVersion version) {
        assert (version.equals(in.getTransportVersion())) : "Stream version [" + in.getTransportVersion() + "] does not match version [" + version + "]";
        return true;
    }
}

