/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions;

import java.io.IOException;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.runtime.functions.CleanupState;

public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT>
extends KeyedProcessFunction<K, IN, OUT>
implements CleanupState {
    private static final long serialVersionUID = 2084560869233898457L;
    private final long minRetentionTime;
    private final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;
    private ValueState<Long> cleanupTimeState;

    public KeyedProcessFunctionWithCleanupState(long minRetentionTime, long maxRetentionTime) {
        this.minRetentionTime = minRetentionTime;
        this.maxRetentionTime = maxRetentionTime;
        this.stateCleaningEnabled = minRetentionTime > 1L;
    }

    protected void initCleanupTimeState(String stateName) {
        if (this.stateCleaningEnabled) {
            ValueStateDescriptor<Long> inputCntDescriptor = new ValueStateDescriptor<Long>(stateName, Types.LONG);
            this.cleanupTimeState = this.getRuntimeContext().getState(inputCntDescriptor);
        }
    }

    protected void registerProcessingCleanupTimer(KeyedProcessFunction.Context ctx, long currentTime) throws Exception {
        if (this.stateCleaningEnabled) {
            this.registerProcessingCleanupTimer(this.cleanupTimeState, currentTime, this.minRetentionTime, this.maxRetentionTime, ctx.timerService());
        }
    }

    protected boolean isProcessingTimeTimer(KeyedProcessFunction.OnTimerContext ctx) {
        return ctx.timeDomain() == TimeDomain.PROCESSING_TIME;
    }

    protected void cleanupState(State ... states) {
        for (State state : states) {
            state.clear();
        }
        this.cleanupTimeState.clear();
    }

    protected Boolean needToCleanupState(Long timestamp) throws IOException {
        if (this.stateCleaningEnabled) {
            Long cleanupTime = this.cleanupTimeState.value();
            return timestamp.equals(cleanupTime);
        }
        return false;
    }
}

