/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.mergetree;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.types.RowKind;

public class MergeTreeReader
implements RecordReader<KeyValue> {
    private final RecordReader<KeyValue> reader;
    private final boolean dropDelete;

    public MergeTreeReader(List<List<SortedRun>> sections, boolean dropDelete, DataFileReader dataFileReader, Comparator<RowData> userKeyComparator, MergeFunction mergeFunction) throws IOException {
        this.dropDelete = dropDelete;
        ArrayList readers = new ArrayList();
        for (List<SortedRun> section : sections) {
            readers.add(() -> MergeTreeReader.readerForSection(section, dataFileReader, userKeyComparator, mergeFunction));
        }
        this.reader = ConcatRecordReader.create(readers);
    }

    @Override
    @Nullable
    public RecordReader.RecordIterator<KeyValue> readBatch() throws IOException {
        final RecordReader.RecordIterator<KeyValue> batch = this.reader.readBatch();
        if (!this.dropDelete) {
            return batch;
        }
        if (batch == null) {
            return null;
        }
        return new RecordReader.RecordIterator<KeyValue>(){

            @Override
            public KeyValue next() throws IOException {
                KeyValue kv;
                do {
                    if ((kv = (KeyValue)batch.next()) != null) continue;
                    return null;
                } while (kv.valueKind() != RowKind.INSERT && kv.valueKind() != RowKind.UPDATE_AFTER);
                return kv;
            }

            @Override
            public void releaseBatch() {
                batch.releaseBatch();
            }
        };
    }

    @Override
    public void close() throws IOException {
        this.reader.close();
    }

    public static RecordReader<KeyValue> readerForSection(List<SortedRun> section, DataFileReader dataFileReader, Comparator<RowData> userKeyComparator, MergeFunction mergeFunction) throws IOException {
        ArrayList<RecordReader<KeyValue>> readers = new ArrayList<RecordReader<KeyValue>>();
        for (SortedRun run : section) {
            readers.add(MergeTreeReader.readerForRun(run, dataFileReader));
        }
        return SortMergeReader.create(readers, userKeyComparator, mergeFunction);
    }

    public static RecordReader<KeyValue> readerForRun(SortedRun run, DataFileReader dataFileReader) throws IOException {
        ArrayList readers = new ArrayList();
        for (DataFileMeta file : run.files()) {
            readers.add(() -> dataFileReader.read(file.fileName()));
        }
        return ConcatRecordReader.create(readers);
    }
}

