/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.leasemanager;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.function.Predicate;
import lombok.Generated;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;

public class DefaultLeaseManager
implements LeaseManager {
    private final List<Rule<LeaseRequest>> concurrentLimitRules;
    private final Settings settings;
    private final StateStore stateStore;

    public DefaultLeaseManager(Settings settings, StateStore stateStore) {
        this.settings = settings;
        this.stateStore = stateStore;
        this.concurrentLimitRules = Arrays.asList(new ConcurrentSessionRule(settings, stateStore), new ConcurrentRefreshJobRule(settings, stateStore));
    }

    @Override
    public void borrow(LeaseRequest request) {
        for (Rule<LeaseRequest> rule : this.concurrentLimitRules) {
            if (rule.test(request)) continue;
            throw new ConcurrencyLimitExceededException(rule.description());
        }
    }

    static interface Rule<T>
    extends Predicate<T> {
        public String description();
    }

    public static class ConcurrentSessionRule
    implements Rule<LeaseRequest> {
        private final Settings settings;
        private final StateStore stateStore;

        @Override
        public String description() {
            return String.format(Locale.ROOT, "domain concurrent active session can not exceed %d", this.sessionMaxLimit());
        }

        @Override
        public boolean test(LeaseRequest leaseRequest) {
            if (leaseRequest.getJobType() != JobType.INTERACTIVE) {
                return true;
            }
            return StateStore.activeSessionsCount(this.stateStore, "*").get() < (long)this.sessionMaxLimit();
        }

        public int sessionMaxLimit() {
            return (Integer)this.settings.getSettingValue(Settings.Key.SPARK_EXECUTION_SESSION_LIMIT);
        }

        @Generated
        public ConcurrentSessionRule(Settings settings, StateStore stateStore) {
            this.settings = settings;
            this.stateStore = stateStore;
        }
    }

    public static class ConcurrentRefreshJobRule
    implements Rule<LeaseRequest> {
        private final Settings settings;
        private final StateStore stateStore;

        @Override
        public String description() {
            return String.format(Locale.ROOT, "domain concurrent refresh job can not exceed %d", this.refreshJobLimit());
        }

        @Override
        public boolean test(LeaseRequest leaseRequest) {
            if (leaseRequest.getJobType() != JobType.REFRESH && leaseRequest.getJobType() != JobType.STREAMING) {
                return true;
            }
            return StateStore.activeRefreshJobCount(this.stateStore, "*").get() < (long)this.refreshJobLimit();
        }

        public int refreshJobLimit() {
            return (Integer)this.settings.getSettingValue(Settings.Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT);
        }

        @Generated
        public ConcurrentRefreshJobRule(Settings settings, StateStore stateStore) {
            this.settings = settings;
            this.stateStore = stateStore;
        }
    }
}

