/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.PlanFollower;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CapacitySchedulerPlanFollower
implements PlanFollower {
    private static final Logger LOG = LoggerFactory.getLogger(CapacitySchedulerPlanFollower.class);
    private Collection<Plan> plans = new ArrayList<Plan>();
    private Clock clock;
    private CapacityScheduler scheduler;

    @Override
    public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
        LOG.info("Initializing Plan Follower Policy:" + this.getClass().getCanonicalName());
        if (!(sched instanceof CapacityScheduler)) {
            throw new YarnRuntimeException("CapacitySchedulerPlanFollower can only work with CapacityScheduler");
        }
        this.clock = clock;
        this.scheduler = (CapacityScheduler)sched;
        this.plans.addAll(plans);
    }

    @Override
    public synchronized void run() {
        for (Plan plan : this.plans) {
            this.synchronizePlan(plan);
        }
    }

    @Override
    public synchronized void synchronizePlan(Plan plan) {
        String defReservationQueue;
        CSQueue queue;
        String planQueueName = plan.getQueueName();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
        }
        long step = plan.getStep();
        long now = this.clock.getTime();
        if (now % step != 0L) {
            now += step - now % step;
        }
        if (!((queue = this.scheduler.getQueue(planQueueName)) instanceof PlanQueue)) {
            LOG.error("The Plan is not an PlanQueue!");
            return;
        }
        PlanQueue planQueue = (PlanQueue)queue;
        Resource clusterResources = this.scheduler.getClusterResource();
        float planAbsCap = planQueue.getAbsoluteCapacity();
        Resource planResources = Resources.multiply((Resource)clusterResources, (double)planAbsCap);
        plan.setTotalCapacity(planResources);
        Set<ReservationAllocation> currentReservations = plan.getReservationsAtTime(now);
        HashSet<String> curReservationNames = new HashSet<String>();
        Resource reservedResources = Resource.newInstance((int)0, (int)0);
        int numRes = 0;
        if (currentReservations != null) {
            numRes = currentReservations.size();
            for (ReservationAllocation reservation : currentReservations) {
                curReservationNames.add(reservation.getReservationId().toString());
                Resources.addTo((Resource)reservedResources, (Resource)reservation.getResourcesAtTime(now));
            }
        }
        if (this.scheduler.getQueue(defReservationQueue = planQueueName + "-default") == null) {
            try {
                ReservationQueue defQueue = new ReservationQueue(this.scheduler, defReservationQueue, planQueue);
                this.scheduler.addQueue(defQueue);
            }
            catch (SchedulerDynamicEditException e) {
                LOG.warn("Exception while trying to create default reservation queue for plan: {}", (Object)planQueueName, (Object)e);
            }
            catch (IOException e) {
                LOG.warn("Exception while trying to create default reservation queue for plan: {}", (Object)planQueueName, (Object)e);
            }
        }
        curReservationNames.add(defReservationQueue);
        if (Resources.greaterThan((ResourceCalculator)this.scheduler.getResourceCalculator(), (Resource)clusterResources, (Resource)reservedResources, (Resource)planResources)) {
            try {
                plan.getReplanner().plan(plan, null);
            }
            catch (PlanningException e) {
                LOG.warn("Exception while trying to replan: {}", (Object)planQueueName, (Object)e);
            }
        }
        List<CSQueue> resQueues = planQueue.getChildQueues();
        HashSet<String> expired = new HashSet<String>();
        for (CSQueue resQueue : resQueues) {
            String resQueueName = resQueue.getQueueName();
            if (curReservationNames.contains(resQueueName)) {
                curReservationNames.remove(resQueueName);
                continue;
            }
            expired.add(resQueueName);
        }
        this.cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue);
        float totalAssignedCapacity = 0.0f;
        if (currentReservations != null) {
            try {
                this.scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0.0f, 1.0f));
            }
            catch (YarnException e) {
                LOG.warn("Exception while trying to release default queue capacity for plan: {}", (Object)planQueueName, (Object)e);
            }
            List<ReservationAllocation> sortedAllocations = this.sortByDelta(new ArrayList<ReservationAllocation>(currentReservations), now);
            for (ReservationAllocation res : sortedAllocations) {
                String currResId = res.getReservationId().toString();
                if (curReservationNames.contains(currResId)) {
                    try {
                        ReservationQueue resQueue = new ReservationQueue(this.scheduler, currResId, planQueue);
                        this.scheduler.addQueue(resQueue);
                    }
                    catch (SchedulerDynamicEditException e) {
                        LOG.warn("Exception while trying to activate reservation: {} for plan: {}", new Object[]{currResId, planQueueName, e});
                    }
                    catch (IOException e) {
                        LOG.warn("Exception while trying to activate reservation: {} for plan: {}", new Object[]{currResId, planQueueName, e});
                    }
                }
                Resource capToAssign = res.getResourcesAtTime(now);
                float targetCapacity = 0.0f;
                if (planResources.getMemory() > 0 && planResources.getVirtualCores() > 0) {
                    targetCapacity = Resources.divide((ResourceCalculator)this.scheduler.getResourceCalculator(), (Resource)clusterResources, (Resource)capToAssign, (Resource)planResources);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Assigning capacity of {} to queue {} with target capacity {}", new Object[]{capToAssign, currResId, Float.valueOf(targetCapacity)});
                }
                float maxCapacity = 1.0f;
                if (res.containsGangs()) {
                    maxCapacity = targetCapacity;
                }
                try {
                    this.scheduler.setEntitlement(currResId, new QueueEntitlement(targetCapacity, maxCapacity));
                }
                catch (YarnException e) {
                    LOG.warn("Exception while trying to size reservation for plan: {}", new Object[]{currResId, planQueueName, e});
                }
                totalAssignedCapacity += targetCapacity;
            }
        }
        float defQCap = 1.0f - totalAssignedCapacity;
        if (LOG.isDebugEnabled()) {
            LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} currReservation: {} default-queue capacity: {}", new Object[]{planResources, numRes, Float.valueOf(defQCap)});
        }
        try {
            this.scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(defQCap, 1.0f));
        }
        catch (YarnException e) {
            LOG.warn("Exception while trying to reclaim default queue capacity for plan: {}", (Object)planQueueName, (Object)e);
        }
        try {
            plan.archiveCompletedReservations(now);
        }
        catch (PlanningException e) {
            LOG.error("Exception in archiving completed reservations: ", (Throwable)e);
        }
        LOG.info("Finished iteration of plan follower edit policy for plan: " + planQueueName);
    }

    private void moveAppsInQueueSync(String expiredReservation, String defReservationQueue) {
        List<ApplicationAttemptId> activeApps = this.scheduler.getAppsInQueue(expiredReservation);
        if (activeApps.isEmpty()) {
            return;
        }
        for (ApplicationAttemptId app : activeApps) {
            try {
                this.scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
            }
            catch (YarnException e) {
                LOG.warn("Encountered unexpected error during migration of application: {} from reservation: {}", new Object[]{app, expiredReservation, e});
            }
        }
    }

    private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove, String defReservationQueue) {
        for (String expiredReservation : toRemove) {
            try {
                this.scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f, 0.0f));
                if (shouldMove) {
                    this.moveAppsInQueueSync(expiredReservation, defReservationQueue);
                }
                if (this.scheduler.getAppsInQueue(expiredReservation).size() > 0) {
                    this.scheduler.killAllAppsInQueue(expiredReservation);
                    LOG.info("Killing applications in queue: {}", (Object)expiredReservation);
                    continue;
                }
                this.scheduler.removeQueue(expiredReservation);
                LOG.info("Queue: " + expiredReservation + " removed");
            }
            catch (YarnException e) {
                LOG.warn("Exception while trying to expire reservation: {}", (Object)expiredReservation, (Object)e);
            }
        }
    }

    @Override
    public synchronized void setPlans(Collection<Plan> plans) {
        this.plans.clear();
        this.plans.addAll(plans);
    }

    private List<ReservationAllocation> sortByDelta(List<ReservationAllocation> currentReservations, long now) {
        Collections.sort(currentReservations, new ReservationAllocationComparator(this.scheduler, now));
        return currentReservations;
    }

    private static class ReservationAllocationComparator
    implements Comparator<ReservationAllocation> {
        CapacityScheduler scheduler;
        long now;

        ReservationAllocationComparator(CapacityScheduler scheduler, long now) {
            this.scheduler = scheduler;
            this.now = now;
        }

        private Resource getUnallocatedReservedResources(ReservationAllocation reservation) {
            CSQueue resQueue = this.scheduler.getQueue(reservation.getReservationId().toString());
            Resource resResource = resQueue != null ? Resources.subtract((Resource)reservation.getResourcesAtTime(this.now), (Resource)Resources.multiply((Resource)this.scheduler.getClusterResource(), (double)resQueue.getAbsoluteCapacity())) : reservation.getResourcesAtTime(this.now);
            return resResource;
        }

        @Override
        public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
            Resource lhsRes = this.getUnallocatedReservedResources(lhs);
            Resource rhsRes = this.getUnallocatedReservedResources(rhs);
            return lhsRes.compareTo((Object)rhsRes);
        }
    }
}

