/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.io.IOException;
import java.util.Collection;
import java.util.function.Function;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter;
import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.function.SerializableFunction;

class ReducingUpsertSink<WriterState, Comm>
implements TwoPhaseCommittingStatefulSink<RowData, WriterState, Comm> {
    private final TwoPhaseCommittingStatefulSink<RowData, WriterState, Comm> wrappedSink;
    private final DataType physicalDataType;
    private final int[] keyProjection;
    private final SinkBufferFlushMode bufferFlushMode;
    private final SerializableFunction<RowData, RowData> valueCopyFunction;

    ReducingUpsertSink(TwoPhaseCommittingStatefulSink<RowData, WriterState, Comm> wrappedSink, DataType physicalDataType, int[] keyProjection, SinkBufferFlushMode bufferFlushMode, SerializableFunction<RowData, RowData> valueCopyFunction) {
        this.wrappedSink = wrappedSink;
        this.physicalDataType = physicalDataType;
        this.keyProjection = keyProjection;
        this.bufferFlushMode = bufferFlushMode;
        this.valueCopyFunction = valueCopyFunction;
    }

    @Override
    public TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter<RowData, WriterState, Comm> createWriter(Sink.InitContext context) throws IOException {
        return new ReducingUpsertWriter(this.wrappedSink.createWriter(context), this.physicalDataType, this.keyProjection, this.bufferFlushMode, context.getProcessingTimeService(), (Function<RowData, RowData>)this.valueCopyFunction);
    }

    public Committer<Comm> createCommitter() throws IOException {
        return this.wrappedSink.createCommitter();
    }

    public SimpleVersionedSerializer<Comm> getCommittableSerializer() {
        return this.wrappedSink.getCommittableSerializer();
    }

    @Override
    public TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter<RowData, WriterState, Comm> restoreWriter(Sink.InitContext context, Collection<WriterState> recoveredState) throws IOException {
        StatefulSink.StatefulSinkWriter wrappedWriter = this.wrappedSink.restoreWriter(context, (Collection)recoveredState);
        return new ReducingUpsertWriter(wrappedWriter, this.physicalDataType, this.keyProjection, this.bufferFlushMode, context.getProcessingTimeService(), (Function<RowData, RowData>)this.valueCopyFunction);
    }

    public SimpleVersionedSerializer<WriterState> getWriterStateSerializer() {
        return this.wrappedSink.getWriterStateSerializer();
    }
}

