/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager;
import org.apache.flink.util.Preconditions;

@Internal
public final class CommittableCollectorSerializer<CommT>
implements SimpleVersionedSerializer<CommittableCollector<CommT>> {
    private static final int MAGIC_NUMBER = -1189141204;
    private final SimpleVersionedSerializer<CommT> committableSerializer;
    private final int subtaskId;
    private final int numberOfSubtasks;

    public CommittableCollectorSerializer(SimpleVersionedSerializer<CommT> committableSerializer, int subtaskId, int numberOfSubtasks) {
        this.committableSerializer = Preconditions.checkNotNull(committableSerializer);
        this.subtaskId = subtaskId;
        this.numberOfSubtasks = numberOfSubtasks;
    }

    @Override
    public int getVersion() {
        return 2;
    }

    @Override
    public byte[] serialize(CommittableCollector<CommT> committableCollector) throws IOException {
        DataOutputSerializer out = new DataOutputSerializer(256);
        out.writeInt(-1189141204);
        this.serializeV2(committableCollector, out);
        return out.getCopyOfBuffer();
    }

    @Override
    public CommittableCollector<CommT> deserialize(int version, byte[] serialized) throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(serialized);
        if (version == 1) {
            return this.deserializeV1(in);
        }
        if (version == 2) {
            CommittableCollectorSerializer.validateMagicNumber(in);
            return this.deserializeV2(in);
        }
        throw new IOException("Unrecognized version or corrupt state: " + version);
    }

    private CommittableCollector<CommT> deserializeV1(DataInputView in) throws IOException {
        return CommittableCollector.ofLegacy(SinkV1CommittableDeserializer.readVersionAndDeserializeList(this.committableSerializer, in));
    }

    private void serializeV2(CommittableCollector<CommT> committableCollector, DataOutputView dataOutputView) throws IOException {
        SimpleVersionedSerialization.writeVersionAndSerializeList(new CheckpointSimpleVersionedSerializer(), new ArrayList<CheckpointCommittableManagerImpl<CommT>>(committableCollector.getCheckpointCommittables()), dataOutputView);
    }

    private CommittableCollector<CommT> deserializeV2(DataInputDeserializer in) throws IOException {
        List checkpoints = SimpleVersionedSerialization.readVersionAndDeserializeList(new CheckpointSimpleVersionedSerializer(), in);
        return new CommittableCollector(checkpoints.stream().collect(Collectors.toMap(CheckpointCommittableManagerImpl::getCheckpointId, e -> e)), this.subtaskId, this.numberOfSubtasks);
    }

    private static void validateMagicNumber(DataInputView in) throws IOException {
        int magicNumber = in.readInt();
        if (magicNumber != -1189141204) {
            throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
        }
    }

    private class SubtaskSimpleVersionedSerializer
    implements SimpleVersionedSerializer<SubtaskCommittableManager<CommT>> {
        private SubtaskSimpleVersionedSerializer() {
        }

        @Override
        public int getVersion() {
            return 0;
        }

        @Override
        public byte[] serialize(SubtaskCommittableManager<CommT> subtask) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(256);
            SimpleVersionedSerialization.writeVersionAndSerializeList(new RequestSimpleVersionedSerializer(), new ArrayList(subtask.getRequests()), out);
            out.writeInt(subtask.getNumCommittables());
            out.writeInt(subtask.getNumDrained());
            out.writeInt(subtask.getNumFailed());
            return out.getCopyOfBuffer();
        }

        @Override
        public SubtaskCommittableManager<CommT> deserialize(int version, byte[] serialized) throws IOException {
            DataInputDeserializer in = new DataInputDeserializer(serialized);
            List requests = SimpleVersionedSerialization.readVersionAndDeserializeList(new RequestSimpleVersionedSerializer(), in);
            return new SubtaskCommittableManager(requests, in.readInt(), in.readInt(), in.readInt(), CommittableCollectorSerializer.this.subtaskId, 1L);
        }

        private class RequestSimpleVersionedSerializer
        implements SimpleVersionedSerializer<CommitRequestImpl<CommT>> {
            private RequestSimpleVersionedSerializer() {
            }

            @Override
            public int getVersion() {
                return 0;
            }

            @Override
            public byte[] serialize(CommitRequestImpl<CommT> request) throws IOException {
                DataOutputSerializer out = new DataOutputSerializer(256);
                SimpleVersionedSerialization.writeVersionAndSerialize(CommittableCollectorSerializer.this.committableSerializer, request.getCommittable(), out);
                out.writeInt(request.getNumberOfRetries());
                out.writeInt(request.getState().ordinal());
                return out.getCopyOfBuffer();
            }

            @Override
            public CommitRequestImpl<CommT> deserialize(int version, byte[] serialized) throws IOException {
                DataInputDeserializer in = new DataInputDeserializer(serialized);
                Object committable = SimpleVersionedSerialization.readVersionAndDeSerialize(CommittableCollectorSerializer.this.committableSerializer, in);
                return new CommitRequestImpl(committable, in.readInt(), CommitRequestState.values()[in.readInt()]);
            }
        }
    }

    private class CheckpointSimpleVersionedSerializer
    implements SimpleVersionedSerializer<CheckpointCommittableManagerImpl<CommT>> {
        private CheckpointSimpleVersionedSerializer() {
        }

        @Override
        public int getVersion() {
            return 0;
        }

        @Override
        public byte[] serialize(CheckpointCommittableManagerImpl<CommT> checkpoint) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(256);
            out.writeLong(checkpoint.getCheckpointId());
            SimpleVersionedSerialization.writeVersionAndSerializeList(new SubtaskSimpleVersionedSerializer(), new ArrayList(checkpoint.getSubtaskCommittableManagers()), out);
            return out.getCopyOfBuffer();
        }

        @Override
        public CheckpointCommittableManagerImpl<CommT> deserialize(int version, byte[] serialized) throws IOException {
            DataInputDeserializer in = new DataInputDeserializer(serialized);
            long checkpointId = in.readLong();
            List subtasks = SimpleVersionedSerialization.readVersionAndDeserializeList(new SubtaskSimpleVersionedSerializer(), in);
            return new CheckpointCommittableManagerImpl(subtasks.stream().collect(Collectors.toMap(SubtaskCommittableManager::getSubtaskId, e -> e)), checkpointId);
        }
    }
}

