/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.RecordAndPosition;

@Internal
public class LimitableBulkFormat<T, SplitT extends FileSourceSplit>
implements BulkFormat<T, SplitT> {
    private static final long serialVersionUID = 1L;
    private final BulkFormat<T, SplitT> format;
    private final long limit;
    @Nullable
    private transient AtomicLong globalNumberRead;

    private LimitableBulkFormat(BulkFormat<T, SplitT> format, long limit) {
        this.format = format;
        this.limit = limit;
    }

    @Override
    public BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException {
        BulkFormat.Reader<T> reader = this.reachLimit() ? null : this.format.createReader(config, split);
        return this.wrapReader(reader);
    }

    @Override
    public BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException {
        BulkFormat.Reader<T> reader = this.reachLimit() ? null : this.format.restoreReader(config, split);
        return this.wrapReader(reader);
    }

    @VisibleForTesting
    AtomicLong globalNumberRead() {
        return this.globalNumberRead;
    }

    private synchronized BulkFormat.Reader<T> wrapReader(BulkFormat.Reader<T> reader) {
        if (this.globalNumberRead == null) {
            this.globalNumberRead = new AtomicLong(0L);
        }
        return new LimitableReader(reader, this.globalNumberRead, this.limit);
    }

    private boolean reachLimit() {
        return this.globalNumberRead != null && this.globalNumberRead.get() >= this.limit;
    }

    @Override
    public boolean isSplittable() {
        return this.format.isSplittable();
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return this.format.getProducedType();
    }

    public static <T, SplitT extends FileSourceSplit> BulkFormat<T, SplitT> create(BulkFormat<T, SplitT> format, Long limit) {
        return limit == null ? format : new LimitableBulkFormat<T, SplitT>(format, limit);
    }

    private static class LimitableReader<T>
    implements BulkFormat.Reader<T> {
        private final BulkFormat.Reader<T> reader;
        private final AtomicLong numRead;
        private final long limit;

        private LimitableReader(BulkFormat.Reader<T> reader, AtomicLong numRead, long limit) {
            this.reader = reader;
            this.numRead = numRead;
            this.limit = limit;
        }

        private boolean reachLimit() {
            return this.numRead.get() >= this.limit;
        }

        @Override
        @Nullable
        public BulkFormat.RecordIterator<T> readBatch() throws IOException {
            if (this.reachLimit()) {
                return null;
            }
            try {
                BulkFormat.RecordIterator<T> batch = this.reader.readBatch();
                return batch == null ? null : new LimitableIterator(batch);
            }
            catch (Exception e) {
                if (this.reachLimit()) {
                    return null;
                }
                throw e;
            }
        }

        @Override
        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
            }
        }

        private class LimitableIterator
        implements BulkFormat.RecordIterator<T> {
            private final BulkFormat.RecordIterator<T> iterator;

            private LimitableIterator(BulkFormat.RecordIterator<T> iterator) {
                this.iterator = iterator;
            }

            @Override
            @Nullable
            public RecordAndPosition<T> next() {
                if (LimitableReader.this.reachLimit()) {
                    return null;
                }
                RecordAndPosition ret = this.iterator.next();
                if (ret != null) {
                    LimitableReader.this.numRead.incrementAndGet();
                }
                return ret;
            }

            @Override
            public void releaseBatch() {
                this.iterator.releaseBatch();
            }
        }
    }
}

