/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.IncompleteBatches;
import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.clients.producer.internals.ProducerBatch;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public class RecordAccumulator {
    private final LogContext logContext;
    private final Logger log;
    private volatile boolean closed;
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    private final int batchSize;
    private final CompressionType compression;
    private final int lingerMs;
    private final long retryBackoffMs;
    private final int deliveryTimeoutMs;
    private final long partitionAvailabilityTimeoutMs;
    private final boolean enableAdaptivePartitioning;
    private final BufferPool free;
    private final Time time;
    private final ApiVersions apiVersions;
    private final ConcurrentMap<String, TopicInfo> topicInfoMap = new CopyOnWriteMap<String, TopicInfo>();
    private final ConcurrentMap<Integer, NodeLatencyStats> nodeStats = new CopyOnWriteMap<Integer, NodeLatencyStats>();
    private final IncompleteBatches incomplete;
    private final Set<TopicPartition> muted;
    private final Map<String, Integer> nodesDrainIndex;
    private final TransactionManager transactionManager;
    private long nextBatchExpiryTimeMs = Long.MAX_VALUE;

    public RecordAccumulator(LogContext logContext, int batchSize, CompressionType compression, int lingerMs, long retryBackoffMs, int deliveryTimeoutMs, PartitionerConfig partitionerConfig, Metrics metrics, String metricGrpName, Time time, ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool) {
        this.logContext = logContext;
        this.log = logContext.logger(RecordAccumulator.class);
        this.closed = false;
        this.flushesInProgress = new AtomicInteger(0);
        this.appendsInProgress = new AtomicInteger(0);
        this.batchSize = batchSize;
        this.compression = compression;
        this.lingerMs = lingerMs;
        this.retryBackoffMs = retryBackoffMs;
        this.deliveryTimeoutMs = deliveryTimeoutMs;
        this.enableAdaptivePartitioning = partitionerConfig.enableAdaptivePartitioning;
        this.partitionAvailabilityTimeoutMs = partitionerConfig.partitionAvailabilityTimeoutMs;
        this.free = bufferPool;
        this.incomplete = new IncompleteBatches();
        this.muted = new HashSet<TopicPartition>();
        this.time = time;
        this.apiVersions = apiVersions;
        this.nodesDrainIndex = new HashMap<String, Integer>();
        this.transactionManager = transactionManager;
        this.registerMetrics(metrics, metricGrpName);
    }

    public RecordAccumulator(LogContext logContext, int batchSize, CompressionType compression, int lingerMs, long retryBackoffMs, int deliveryTimeoutMs, Metrics metrics, String metricGrpName, Time time, ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool) {
        this(logContext, batchSize, compression, lingerMs, retryBackoffMs, deliveryTimeoutMs, new PartitionerConfig(), metrics, metricGrpName, time, apiVersions, transactionManager, bufferPool);
    }

    private void registerMetrics(Metrics metrics, String metricGrpName) {
        metrics.addMetric(metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records"), (config, now) -> this.free.queued());
        metrics.addMetric(metrics.metricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used)."), (config, now) -> this.free.totalMemory());
        metrics.addMetric(metrics.metricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list)."), (config, now) -> this.free.availableMemory());
    }

    private void setPartition(AppendCallbacks callbacks, int partition) {
        if (callbacks != null) {
            callbacks.setPartition(partition);
        }
    }

    private boolean partitionChanged(String topic, TopicInfo topicInfo, BuiltInPartitioner.StickyPartitionInfo partitionInfo, Deque<ProducerBatch> deque, long nowMs, Cluster cluster) {
        if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
            this.log.trace("Partition {} for topic {} switched by a concurrent append, retrying", (Object)partitionInfo.partition(), (Object)topic);
            return true;
        }
        if (this.allBatchesFull(deque)) {
            topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 0, cluster, true);
            if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
                this.log.trace("Completed previously disabled switch for topic {} partition {}, retrying", (Object)topic, (Object)partitionInfo.partition());
                return true;
            }
        }
        return false;
    }

    /*
     * Exception decompiling
     */
    public RecordAppendResult append(String topic, int partition, long timestamp, byte[] key, byte[] value, Header[] headers, AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs, Cluster cluster) throws InterruptedException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [13[UNCONDITIONALDOLOOP]], but top level block is 1[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private RecordAppendResult appendNewBatch(String topic, int partition, Deque<ProducerBatch> dq, long timestamp, byte[] key, byte[] value, Header[] headers, AppendCallbacks callbacks, ByteBuffer buffer, long nowMs) {
        assert (partition != -1);
        RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
        if (appendResult != null) {
            return appendResult;
        }
        MemoryRecordsBuilder recordsBuilder = this.recordsBuilder(buffer, this.apiVersions.maxUsableProduceMagic());
        ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
        FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callbacks, nowMs));
        dq.addLast(batch);
        this.incomplete.add(batch);
        return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes());
    }

    private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
        if (this.transactionManager != null && maxUsableMagic < 2) {
            throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not support the required message format (v2). The broker must be version 0.11 or later.");
        }
        return MemoryRecords.builder(buffer, maxUsableMagic, this.compression, TimestampType.CREATE_TIME, 0L);
    }

    private boolean allBatchesFull(Deque<ProducerBatch> deque) {
        ProducerBatch last = deque.peekLast();
        return last == null || last.isFull();
    }

    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque, long nowMs) {
        if (this.closed) {
            throw new KafkaException("Producer closed while send in progress");
        }
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            int initialBytes = last.estimatedSizeInBytes();
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
            if (future == null) {
                last.closeForRecordAppends();
            } else {
                int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes);
            }
        }
        return null;
    }

    private boolean isMuted(TopicPartition tp) {
        return this.muted.contains(tp);
    }

    public void resetNextBatchExpiryTime() {
        this.nextBatchExpiryTimeMs = Long.MAX_VALUE;
    }

    public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) {
        if (batch.createdMs + (long)this.deliveryTimeoutMs > 0L) {
            this.nextBatchExpiryTimeMs = Math.min(this.nextBatchExpiryTimeMs, batch.createdMs + (long)this.deliveryTimeoutMs);
        } else {
            this.log.warn("Skipping next batch expiry time update due to addition overflow: batch.createMs={}, deliveryTimeoutMs={}", (Object)batch.createdMs, (Object)this.deliveryTimeoutMs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<ProducerBatch> expiredBatches(long now) {
        ArrayList<ProducerBatch> expiredBatches = new ArrayList<ProducerBatch>();
        for (TopicInfo topicInfo : this.topicInfoMap.values()) {
            Iterator iterator = topicInfo.batches.values().iterator();
            while (iterator.hasNext()) {
                Deque deque;
                Deque deque2 = deque = (Deque)iterator.next();
                synchronized (deque2) {
                    while (!deque.isEmpty()) {
                        ProducerBatch batch = (ProducerBatch)deque.getFirst();
                        if (batch.hasReachedDeliveryTimeout(this.deliveryTimeoutMs, now)) {
                            deque.poll();
                            batch.abortRecordAppends();
                            expiredBatches.add(batch);
                            continue;
                        }
                        this.maybeUpdateNextBatchExpiryTime(batch);
                        break;
                    }
                }
            }
        }
        return expiredBatches;
    }

    public long getDeliveryTimeoutMs() {
        return this.deliveryTimeoutMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reenqueue(ProducerBatch batch, long now) {
        Deque<ProducerBatch> deque;
        batch.reenqueued(now);
        Deque<ProducerBatch> deque2 = deque = this.getOrCreateDeque(batch.topicPartition);
        synchronized (deque2) {
            if (this.transactionManager != null) {
                this.insertInSequenceOrder(deque, batch);
            } else {
                deque.addFirst(batch);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int splitAndReenqueue(ProducerBatch bigBatch) {
        CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), this.compression, Math.max(1.0f, (float)bigBatch.compressionRatio()));
        Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
        int numSplitBatches = dq.size();
        Deque<ProducerBatch> partitionDequeue = this.getOrCreateDeque(bigBatch.topicPartition);
        while (!dq.isEmpty()) {
            ProducerBatch batch = dq.pollLast();
            this.incomplete.add(batch);
            Deque<ProducerBatch> deque = partitionDequeue;
            synchronized (deque) {
                if (this.transactionManager != null) {
                    this.transactionManager.addInFlightBatch(batch);
                    this.insertInSequenceOrder(partitionDequeue, batch);
                } else {
                    partitionDequeue.addFirst(batch);
                }
            }
        }
        return numSplitBatches;
    }

    private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) {
        if (batch.baseSequence() == -1) {
            throw new IllegalStateException("Trying to re-enqueue a batch which doesn't have a sequence even though idempotency is enabled.");
        }
        if (!this.transactionManager.hasInflightBatches(batch.topicPartition)) {
            throw new IllegalStateException("We are re-enqueueing a batch which is not tracked as part of the in flight requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence());
        }
        ProducerBatch firstBatchInQueue = deque.peekFirst();
        if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) {
            ArrayList<ProducerBatch> orderedBatches = new ArrayList<ProducerBatch>();
            while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) {
                orderedBatches.add(deque.pollFirst());
            }
            this.log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size());
            deque.addFirst(batch);
            for (int i = orderedBatches.size() - 1; i >= 0; --i) {
                deque.addFirst((ProducerBatch)orderedBatches.get(i));
            }
        } else {
            deque.addFirst(batch);
        }
    }

    private long batchReady(long nowMs, boolean exhausted, TopicPartition part, Node leader, long waitedTimeMs, boolean backingOff, boolean full, long nextReadyCheckDelayMs, Set<Node> readyNodes) {
        if (!readyNodes.contains(leader) && !this.isMuted(part)) {
            boolean sendable;
            long timeToWaitMs = backingOff ? this.retryBackoffMs : (long)this.lingerMs;
            boolean expired = waitedTimeMs >= timeToWaitMs;
            boolean transactionCompleting = this.transactionManager != null && this.transactionManager.isCompleting();
            boolean bl = sendable = full || expired || exhausted || this.closed || this.flushInProgress() || transactionCompleting;
            if (sendable && !backingOff) {
                readyNodes.add(leader);
            } else {
                long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0L);
                nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
            }
        }
        return nextReadyCheckDelayMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long partitionReady(Cluster cluster, long nowMs, String topic, TopicInfo topicInfo, long nextReadyCheckDelayMs, Set<Node> readyNodes, Set<String> unknownLeaderTopics) {
        ConcurrentMap<Integer, Deque<ProducerBatch>> batches = topicInfo.batches;
        int[] queueSizes = null;
        int[] partitionIds = null;
        if (this.enableAdaptivePartitioning && batches.size() >= cluster.partitionsForTopic(topic).size()) {
            queueSizes = new int[batches.size()];
            partitionIds = new int[queueSizes.length];
        }
        int queueSizesIndex = -1;
        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry entry : batches.entrySet()) {
            long readyTimeMs;
            NodeLatencyStats nodeLatencyStats;
            boolean full;
            int dequeSize;
            boolean backingOff;
            long waitedTimeMs;
            Deque deque;
            TopicPartition part = new TopicPartition(topic, (Integer)entry.getKey());
            Node leader = cluster.leaderFor(part);
            if (leader != null && queueSizes != null) {
                assert (++queueSizesIndex < queueSizes.length);
                partitionIds[queueSizesIndex] = part.partition();
            }
            Deque deque2 = deque = (Deque)entry.getValue();
            synchronized (deque2) {
                ProducerBatch batch = (ProducerBatch)deque.peekFirst();
                if (batch == null) {
                    continue;
                }
                waitedTimeMs = batch.waitedTimeMs(nowMs);
                backingOff = batch.attempts() > 0 && waitedTimeMs < this.retryBackoffMs;
                dequeSize = deque.size();
                full = dequeSize > 1 || batch.isFull();
            }
            if (leader == null) {
                unknownLeaderTopics.add(part.topic());
                continue;
            }
            if (queueSizes != null) {
                queueSizes[queueSizesIndex] = dequeSize;
            }
            if (this.partitionAvailabilityTimeoutMs > 0L && (nodeLatencyStats = (NodeLatencyStats)this.nodeStats.get(leader.id())) != null && (readyTimeMs = nodeLatencyStats.readyTimeMs) - nodeLatencyStats.drainTimeMs > this.partitionAvailabilityTimeoutMs) {
                --queueSizesIndex;
            }
            nextReadyCheckDelayMs = this.batchReady(nowMs, exhausted, part, leader, waitedTimeMs, backingOff, full, nextReadyCheckDelayMs, readyNodes);
        }
        topicInfo.builtInPartitioner.updatePartitionLoadStats(queueSizes, partitionIds, queueSizesIndex + 1);
        return nextReadyCheckDelayMs;
    }

    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        HashSet<Node> readyNodes = new HashSet<Node>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        HashSet<String> unknownLeaderTopics = new HashSet<String>();
        for (Map.Entry topicInfoEntry : this.topicInfoMap.entrySet()) {
            String topic = (String)topicInfoEntry.getKey();
            nextReadyCheckDelayMs = this.partitionReady(cluster, nowMs, topic, (TopicInfo)topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasUndrained() {
        for (TopicInfo topicInfo : this.topicInfoMap.values()) {
            Iterator iterator = topicInfo.batches.values().iterator();
            while (iterator.hasNext()) {
                Deque deque;
                Deque deque2 = deque = (Deque)iterator.next();
                synchronized (deque2) {
                    if (!deque.isEmpty()) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) {
        ProducerIdAndEpoch producerIdAndEpoch = null;
        if (this.transactionManager != null) {
            int firstInFlightSequence;
            if (!this.transactionManager.isSendToPartitionAllowed(tp)) {
                return true;
            }
            producerIdAndEpoch = this.transactionManager.producerIdAndEpoch();
            if (!producerIdAndEpoch.isValid()) {
                return true;
            }
            if (!first.hasSequence()) {
                if (this.transactionManager.hasInflightBatches(tp) && this.transactionManager.hasStaleProducerIdAndEpoch(tp)) {
                    return true;
                }
                if (this.transactionManager.hasUnresolvedSequence(first.topicPartition)) {
                    return true;
                }
            }
            if ((firstInFlightSequence = this.transactionManager.firstInFlightSequence(first.topicPartition)) != -1 && first.hasSequence() && first.baseSequence() != firstInFlightSequence) {
                return true;
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
        int size = 0;
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        ArrayList<ProducerBatch> ready = new ArrayList<ProducerBatch>();
        int drainIndex = this.getDrainIndex(node.idString());
        int start = drainIndex %= parts.size();
        do {
            ProducerBatch batch;
            Deque<ProducerBatch> deque;
            PartitionInfo part = parts.get(drainIndex);
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            this.updateDrainIndex(node.idString(), drainIndex);
            drainIndex = (drainIndex + 1) % parts.size();
            if (this.isMuted(tp) || (deque = this.getDeque(tp)) == null) continue;
            Deque<ProducerBatch> deque2 = deque;
            synchronized (deque2) {
                ProducerIdAndEpoch producerIdAndEpoch;
                boolean backoff;
                ProducerBatch first = deque.peekFirst();
                if (first == null) {
                    continue;
                }
                boolean bl = backoff = first.attempts() > 0 && first.waitedTimeMs(now) < this.retryBackoffMs;
                if (backoff) {
                    continue;
                }
                if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                    break;
                }
                if (this.shouldStopDrainBatchesForPartition(first, tp)) {
                    break;
                }
                batch = deque.pollFirst();
                boolean isTransactional = this.transactionManager != null && this.transactionManager.isTransactional();
                ProducerIdAndEpoch producerIdAndEpoch2 = producerIdAndEpoch = this.transactionManager != null ? this.transactionManager.producerIdAndEpoch() : null;
                if (producerIdAndEpoch != null && !batch.hasSequence()) {
                    this.transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
                    batch.setProducerState(producerIdAndEpoch, this.transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                    this.transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                    this.log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence {} being sent to partition {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, batch.baseSequence(), tp);
                    this.transactionManager.addInFlightBatch(batch);
                }
            }
            batch.close();
            size += batch.records().sizeInBytes();
            ready.add(batch);
            batch.drained(now);
        } while (start != drainIndex);
        return ready;
    }

    private int getDrainIndex(String idString) {
        return this.nodesDrainIndex.computeIfAbsent(idString, s -> 0);
    }

    private void updateDrainIndex(String idString, int drainIndex) {
        this.nodesDrainIndex.put(idString, drainIndex);
    }

    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
        if (nodes.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap<Integer, List<ProducerBatch>> batches = new HashMap<Integer, List<ProducerBatch>>();
        for (Node node : nodes) {
            List<ProducerBatch> ready = this.drainBatchesForOneNode(cluster, node, maxSize, now);
            batches.put(node.id(), ready);
        }
        return batches;
    }

    public void updateNodeLatencyStats(Integer nodeId, long nowMs, boolean canDrain) {
        if (this.partitionAvailabilityTimeoutMs <= 0L) {
            return;
        }
        NodeLatencyStats nodeLatencyStats = this.nodeStats.computeIfAbsent(nodeId, id -> new NodeLatencyStats(nowMs));
        if (canDrain) {
            nodeLatencyStats.drainTimeMs = nowMs;
        }
        nodeLatencyStats.readyTimeMs = nowMs;
    }

    public NodeLatencyStats getNodeLatencyStats(Integer nodeId) {
        return (NodeLatencyStats)this.nodeStats.get(nodeId);
    }

    public BuiltInPartitioner getBuiltInPartitioner(String topic) {
        return ((TopicInfo)this.topicInfoMap.get((Object)topic)).builtInPartitioner;
    }

    public long nextExpiryTimeMs() {
        return this.nextBatchExpiryTimeMs;
    }

    public Deque<ProducerBatch> getDeque(TopicPartition tp) {
        TopicInfo topicInfo = (TopicInfo)this.topicInfoMap.get(tp.topic());
        if (topicInfo == null) {
            return null;
        }
        return (Deque)topicInfo.batches.get(tp.partition());
    }

    private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
        TopicInfo topicInfo = this.topicInfoMap.computeIfAbsent(tp.topic(), k -> new TopicInfo(this.logContext, (String)k, this.batchSize));
        return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new ArrayDeque());
    }

    public void deallocate(ProducerBatch batch) {
        this.incomplete.remove(batch);
        if (!batch.isSplitBatch()) {
            this.free.deallocate(batch.buffer(), batch.initialCapacity());
        }
    }

    long bufferPoolAvailableMemory() {
        return this.free.availableMemory();
    }

    boolean flushInProgress() {
        return this.flushesInProgress.get() > 0;
    }

    public void beginFlush() {
        this.flushesInProgress.getAndIncrement();
    }

    private boolean appendsInProgress() {
        return this.appendsInProgress.get() > 0;
    }

    public void awaitFlushCompletion() throws InterruptedException {
        try {
            for (ProduceRequestResult result : this.incomplete.requestResults()) {
                result.await();
            }
        }
        finally {
            this.flushesInProgress.decrementAndGet();
        }
    }

    public boolean hasIncomplete() {
        return !this.incomplete.isEmpty();
    }

    public void abortIncompleteBatches() {
        do {
            this.abortBatches();
        } while (this.appendsInProgress());
        this.abortBatches();
        this.topicInfoMap.clear();
    }

    private void abortBatches() {
        this.abortBatches(new KafkaException("Producer is closed forcefully."));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void abortBatches(RuntimeException reason) {
        for (ProducerBatch batch : this.incomplete.copyAll()) {
            Deque<ProducerBatch> dq;
            Deque<ProducerBatch> deque = dq = this.getDeque(batch.topicPartition);
            synchronized (deque) {
                batch.abortRecordAppends();
                dq.remove(batch);
            }
            batch.abort(reason);
            this.deallocate(batch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void abortUndrainedBatches(RuntimeException reason) {
        for (ProducerBatch batch : this.incomplete.copyAll()) {
            Deque<ProducerBatch> dq = this.getDeque(batch.topicPartition);
            boolean aborted = false;
            Deque<ProducerBatch> deque = dq;
            synchronized (deque) {
                if (this.transactionManager != null && !batch.hasSequence() || this.transactionManager == null && !batch.isClosed()) {
                    aborted = true;
                    batch.abortRecordAppends();
                    dq.remove(batch);
                }
            }
            if (!aborted) continue;
            batch.abort(reason);
            this.deallocate(batch);
        }
    }

    public void mutePartition(TopicPartition tp) {
        this.muted.add(tp);
    }

    public void unmutePartition(TopicPartition tp) {
        this.muted.remove(tp);
    }

    public void close() {
        this.closed = true;
        this.free.close();
    }

    private static /* synthetic */ Deque lambda$append$4(Integer k) {
        return new ArrayDeque();
    }

    private /* synthetic */ TopicInfo lambda$append$3(String k) {
        return new TopicInfo(this.logContext, k, this.batchSize);
    }

    public static final class NodeLatencyStats {
        public volatile long readyTimeMs;
        public volatile long drainTimeMs;

        NodeLatencyStats(long nowMs) {
            this.readyTimeMs = nowMs;
            this.drainTimeMs = nowMs;
        }
    }

    private static class TopicInfo {
        public final ConcurrentMap<Integer, Deque<ProducerBatch>> batches = new CopyOnWriteMap<Integer, Deque<ProducerBatch>>();
        public final BuiltInPartitioner builtInPartitioner;

        public TopicInfo(LogContext logContext, String topic, int stickyBatchSize) {
            this.builtInPartitioner = new BuiltInPartitioner(logContext, topic, stickyBatchSize);
        }
    }

    public static final class ReadyCheckResult {
        public final Set<Node> readyNodes;
        public final long nextReadyCheckDelayMs;
        public final Set<String> unknownLeaderTopics;

        public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, Set<String> unknownLeaderTopics) {
            this.readyNodes = readyNodes;
            this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
            this.unknownLeaderTopics = unknownLeaderTopics;
        }
    }

    public static interface AppendCallbacks
    extends Callback {
        public void setPartition(int var1);
    }

    public static final class RecordAppendResult {
        public final FutureRecordMetadata future;
        public final boolean batchIsFull;
        public final boolean newBatchCreated;
        public final boolean abortForNewBatch;
        public final int appendedBytes;

        public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated, boolean abortForNewBatch, int appendedBytes) {
            this.future = future;
            this.batchIsFull = batchIsFull;
            this.newBatchCreated = newBatchCreated;
            this.abortForNewBatch = abortForNewBatch;
            this.appendedBytes = appendedBytes;
        }
    }

    public static final class PartitionerConfig {
        private final boolean enableAdaptivePartitioning;
        private final long partitionAvailabilityTimeoutMs;

        public PartitionerConfig(boolean enableAdaptivePartitioning, long partitionAvailabilityTimeoutMs) {
            this.enableAdaptivePartitioning = enableAdaptivePartitioning;
            this.partitionAvailabilityTimeoutMs = partitionAvailabilityTimeoutMs;
        }

        public PartitionerConfig() {
            this(false, 0L);
        }
    }
}

