/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Callback;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Producer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Metric;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.MetricName;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Node;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class FlinkKafkaInternalProducer<K, V>
implements Producer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
    protected final KafkaProducer<K, V> kafkaProducer;
    private final Object producerClosingLock;
    private volatile boolean closed;
    @Nullable
    protected final String transactionalId;

    public FlinkKafkaInternalProducer(Properties properties) {
        this.transactionalId = properties.getProperty("transactional.id");
        this.kafkaProducer = new KafkaProducer(properties);
        this.producerClosingLock = new Object();
        this.closed = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void initTransactions() {
        Object object = this.producerClosingLock;
        synchronized (object) {
            this.ensureNotClosed();
            this.kafkaProducer.initTransactions();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void beginTransaction() throws ProducerFencedException {
        Object object = this.producerClosingLock;
        synchronized (object) {
            this.ensureNotClosed();
            this.kafkaProducer.beginTransaction();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitTransaction() throws ProducerFencedException {
        Object object = this.producerClosingLock;
        synchronized (object) {
            this.ensureNotClosed();
            this.kafkaProducer.commitTransaction();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void abortTransaction() throws ProducerFencedException {
        Object object = this.producerClosingLock;
        synchronized (object) {
            this.ensureNotClosed();
            this.kafkaProducer.abortTransaction();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        Object object = this.producerClosingLock;
        synchronized (object) {
            this.ensureNotClosed();
            this.kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
        }
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
        this.kafkaProducer.sendOffsetsToTransaction(map, consumerGroupMetadata);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.kafkaProducer.send(record);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        return this.kafkaProducer.send(record, callback);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        Object object = this.producerClosingLock;
        synchronized (object) {
            this.ensureNotClosed();
            return this.kafkaProducer.partitionsFor(topic);
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return this.kafkaProducer.metrics();
    }

    @Override
    public void close() {
        throw new UnsupportedOperationException("Close without timeout is now allowed because it can leave lingering Kafka threads.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(Duration duration) {
        Object object = this.producerClosingLock;
        synchronized (object) {
            this.kafkaProducer.close(duration);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closed internal KafkaProducer {}. Stacktrace: {}", (Object)System.identityHashCode(this), (Object)Arrays.stream(Thread.currentThread().getStackTrace()).map(StackTraceElement::toString).collect(Collectors.joining("\n")));
            }
            this.closed = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() {
        this.kafkaProducer.flush();
        if (this.transactionalId != null) {
            Object object = this.producerClosingLock;
            synchronized (object) {
                this.ensureNotClosed();
                this.flushNewPartitions();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resumeTransaction(long producerId, short epoch) {
        Object object = this.producerClosingLock;
        synchronized (object) {
            Object transactionManager;
            this.ensureNotClosed();
            Preconditions.checkState((producerId >= 0L && epoch >= 0 ? 1 : 0) != 0, (String)"Incorrect values for producerId %s and epoch %s", (Object[])new Object[]{producerId, epoch});
            LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", new Object[]{this.transactionalId, producerId, epoch});
            Object object2 = transactionManager = FlinkKafkaInternalProducer.getField(this.kafkaProducer, "transactionManager");
            synchronized (object2) {
                Object txnPartitionMap = FlinkKafkaInternalProducer.getField(transactionManager, "txnPartitionMap");
                FlinkKafkaInternalProducer.invoke(transactionManager, "transitionTo", FlinkKafkaInternalProducer.getEnum("org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
                FlinkKafkaInternalProducer.invoke(txnPartitionMap, "reset", new Object[0]);
                FlinkKafkaInternalProducer.setField(transactionManager, "producerIdAndEpoch", this.createProducerIdAndEpoch(producerId, epoch));
                FlinkKafkaInternalProducer.invoke(transactionManager, "transitionTo", FlinkKafkaInternalProducer.getEnum("org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
                FlinkKafkaInternalProducer.invoke(transactionManager, "transitionTo", FlinkKafkaInternalProducer.getEnum("org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
                FlinkKafkaInternalProducer.setField(transactionManager, "transactionStarted", true);
            }
        }
    }

    public String getTransactionalId() {
        return this.transactionalId;
    }

    public long getProducerId() {
        Object transactionManager = FlinkKafkaInternalProducer.getField(this.kafkaProducer, "transactionManager");
        Object producerIdAndEpoch = FlinkKafkaInternalProducer.getField(transactionManager, "producerIdAndEpoch");
        return (Long)FlinkKafkaInternalProducer.getField(producerIdAndEpoch, "producerId");
    }

    public short getEpoch() {
        Object transactionManager = FlinkKafkaInternalProducer.getField(this.kafkaProducer, "transactionManager");
        Object producerIdAndEpoch = FlinkKafkaInternalProducer.getField(transactionManager, "producerIdAndEpoch");
        return (Short)FlinkKafkaInternalProducer.getField(producerIdAndEpoch, "epoch");
    }

    @VisibleForTesting
    public int getTransactionCoordinatorId() {
        Object transactionManager = FlinkKafkaInternalProducer.getField(this.kafkaProducer, "transactionManager");
        Node node = (Node)FlinkKafkaInternalProducer.invoke(transactionManager, "coordinator", new Object[]{FindCoordinatorRequest.CoordinatorType.TRANSACTION});
        return node.id();
    }

    private void ensureNotClosed() {
        if (this.closed) {
            throw new IllegalStateException(String.format("The producer %s has already been closed", System.identityHashCode(this)));
        }
    }

    private Object createProducerIdAndEpoch(long producerId, short epoch) {
        try {
            Field field = TransactionManager.class.getDeclaredField("producerIdAndEpoch");
            Class<?> clazz = field.getType();
            Constructor<?> constructor = clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE);
            constructor.setAccessible(true);
            return constructor.newInstance(producerId, epoch);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchFieldException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private void flushNewPartitions() {
        LOG.info("Flushing new partitions");
        TransactionalRequestResult result = this.enqueueNewPartitions();
        Object sender = FlinkKafkaInternalProducer.getField(this.kafkaProducer, "sender");
        FlinkKafkaInternalProducer.invoke(sender, "wakeup", new Object[0]);
        result.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TransactionalRequestResult enqueueNewPartitions() {
        Object transactionManager;
        Object object = transactionManager = FlinkKafkaInternalProducer.getField(this.kafkaProducer, "transactionManager");
        synchronized (object) {
            TransactionalRequestResult result;
            Object newPartitionsInTransaction = FlinkKafkaInternalProducer.getField(transactionManager, "newPartitionsInTransaction");
            Object newPartitionsInTransactionIsEmpty = FlinkKafkaInternalProducer.invoke(newPartitionsInTransaction, "isEmpty", new Object[0]);
            if (newPartitionsInTransactionIsEmpty instanceof Boolean && !((Boolean)newPartitionsInTransactionIsEmpty).booleanValue()) {
                Object txnRequestHandler = FlinkKafkaInternalProducer.invoke(transactionManager, "addPartitionsToTransactionHandler", new Object[0]);
                FlinkKafkaInternalProducer.invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
                result = (TransactionalRequestResult)FlinkKafkaInternalProducer.getField(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
            } else {
                result = new TransactionalRequestResult("AddPartitionsToTxn");
                result.done();
            }
            return result;
        }
    }

    protected static Enum<?> getEnum(String enumFullName) {
        String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
        if (x.length == 2) {
            String enumClassName = x[0];
            String enumName = x[1];
            try {
                Class<?> cl = Class.forName(enumClassName);
                return Enum.valueOf(cl, enumName);
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException("Incompatible KafkaProducer version", e);
            }
        }
        return null;
    }

    protected static Object invoke(Object object, String methodName, Object ... args) {
        Class[] argTypes = new Class[args.length];
        for (int i = 0; i < args.length; ++i) {
            argTypes[i] = args[i].getClass();
        }
        return FlinkKafkaInternalProducer.invoke(object, methodName, argTypes, args);
    }

    private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
        try {
            Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
            method.setAccessible(true);
            return method.invoke(object, args);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    protected static Object getField(Object object, String fieldName) {
        return FlinkKafkaInternalProducer.getField(object, object.getClass(), fieldName);
    }

    private static Object getField(Object object, Class<?> clazz, String fieldName) {
        try {
            Field field = clazz.getDeclaredField(fieldName);
            field.setAccessible(true);
            return field.get(object);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    protected static void setField(Object object, String fieldName, Object value) {
        try {
            Field field = object.getClass().getDeclaredField(fieldName);
            field.setAccessible(true);
            field.set(object, value);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }
}

