/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLConnection;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SequencedCollection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MRConstants;
import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RamManager;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.ReduceTaskRunner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapred.SortedRanges;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.TaskRunner;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

class ReduceTask
extends Task {
    private static final Log LOG;
    private int numMaps;
    private ReduceCopier reduceCopier;
    private CompressionCodec codec;
    private Progress copyPhase;
    private Progress sortPhase;
    private Progress reducePhase;
    private Counters.Counter reduceShuffleBytes;
    private Counters.Counter reduceInputKeyCounter;
    private Counters.Counter reduceInputValueCounter;
    private Counters.Counter reduceOutputCounter;
    private Counters.Counter reduceCombineOutputCounter;
    private Comparator<FileStatus> mapOutputFileComparator;
    private final SortedSet<FileStatus> mapOutputFilesOnDisk;

    public ReduceTask() {
        this.getProgress().setStatus("reduce");
        this.setPhase(TaskStatus.Phase.SHUFFLE);
        this.reduceShuffleBytes = this.getCounters().findCounter(Task.Counter.REDUCE_SHUFFLE_BYTES);
        this.reduceInputKeyCounter = this.getCounters().findCounter(Task.Counter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = this.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
        this.reduceOutputCounter = this.getCounters().findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS);
        this.reduceCombineOutputCounter = this.getCounters().findCounter(Task.Counter.COMBINE_OUTPUT_RECORDS);
        this.mapOutputFileComparator = new Comparator<FileStatus>(){

            @Override
            public int compare(FileStatus a, FileStatus b) {
                if (a.getLen() < b.getLen()) {
                    return -1;
                }
                if (a.getLen() == b.getLen()) {
                    if (a.getPath().toString().equals(b.getPath().toString())) {
                        return 0;
                    }
                    return -1;
                }
                return 1;
            }
        };
        this.mapOutputFilesOnDisk = new TreeSet<FileStatus>(this.mapOutputFileComparator);
    }

    public ReduceTask(String jobFile, TaskAttemptID taskId, int partition, int numMaps, int numSlotsRequired) {
        super(jobFile, taskId, partition, numSlotsRequired);
        this.getProgress().setStatus("reduce");
        this.setPhase(TaskStatus.Phase.SHUFFLE);
        this.reduceShuffleBytes = this.getCounters().findCounter(Task.Counter.REDUCE_SHUFFLE_BYTES);
        this.reduceInputKeyCounter = this.getCounters().findCounter(Task.Counter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = this.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
        this.reduceOutputCounter = this.getCounters().findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS);
        this.reduceCombineOutputCounter = this.getCounters().findCounter(Task.Counter.COMBINE_OUTPUT_RECORDS);
        this.mapOutputFileComparator = new /* invalid duplicate definition of identical inner class */;
        this.mapOutputFilesOnDisk = new TreeSet<FileStatus>(this.mapOutputFileComparator);
        this.numMaps = numMaps;
    }

    private CompressionCodec initCodec() {
        if (this.conf.getCompressMapOutput()) {
            Class<? extends CompressionCodec> codecClass = this.conf.getMapOutputCompressorClass(DefaultCodec.class);
            return ReflectionUtils.newInstance(codecClass, this.conf);
        }
        return null;
    }

    @Override
    public TaskRunner createRunner(TaskTracker tracker, TaskTracker.TaskInProgress tip, TaskTracker.RunningJob rjob) throws IOException {
        return new ReduceTaskRunner(tip, tracker, this.conf, rjob);
    }

    @Override
    public boolean isMapTask() {
        return false;
    }

    public int getNumMaps() {
        return this.numMaps;
    }

    @Override
    public void localizeConfiguration(JobConf conf) throws IOException {
        super.localizeConfiguration(conf);
        conf.setNumMapTasks(this.numMaps);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        super.write(out);
        out.writeInt(this.numMaps);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);
        this.numMaps = in.readInt();
    }

    private Path[] getMapFiles(FileSystem fs, boolean isLocal) throws IOException {
        ArrayList<Path> fileList = new ArrayList<Path>();
        if (isLocal) {
            for (int i = 0; i < this.numMaps; ++i) {
                fileList.add(this.mapOutputFile.getInputFile(i));
            }
        } else {
            for (FileStatus filestatus : this.mapOutputFilesOnDisk) {
                fileList.add(filestatus.getPath());
            }
        }
        return fileList.toArray(new Path[0]);
    }

    @Override
    public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException {
        this.umbilical = umbilical;
        job.setBoolean("mapred.skip.on", this.isSkipping());
        if (this.isMapOrReduce()) {
            this.copyPhase = this.getProgress().addPhase("copy");
            this.sortPhase = this.getProgress().addPhase("sort");
            this.reducePhase = this.getProgress().addPhase("reduce");
        }
        Task.TaskReporter reporter = new Task.TaskReporter(this, this.getProgress(), umbilical, this.jvmContext);
        reporter.startCommunicationThread();
        boolean useNewApi = job.getUseNewReducer();
        this.initialize(job, this.getJobID(), reporter, useNewApi);
        if (this.jobCleanup) {
            this.runJobCleanupTask(umbilical, reporter);
            return;
        }
        if (this.jobSetup) {
            this.runJobSetupTask(umbilical, reporter);
            return;
        }
        if (this.taskCleanup) {
            this.runTaskCleanupTask(umbilical, reporter);
            return;
        }
        this.codec = this.initCodec();
        boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
        if (!isLocal) {
            this.reduceCopier = new ReduceCopier(umbilical, job, reporter);
            if (!this.reduceCopier.fetchOutputs()) {
                if (this.reduceCopier.mergeThrowable instanceof FSError) {
                    throw (FSError)this.reduceCopier.mergeThrowable;
                }
                throw new IOException("Task: " + this.getTaskID() + " - The reduce copier failed", this.reduceCopier.mergeThrowable);
            }
        }
        this.copyPhase.complete();
        this.setPhase(TaskStatus.Phase.SORT);
        this.statusUpdate(umbilical);
        FileSystem rfs = FileSystem.getLocal(job).getRaw();
        RawKeyValueIterator rIter = isLocal ? Merger.merge((Configuration)job, rfs, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), this.codec, this.getMapFiles(rfs, true), !this.conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), new Path(this.getTaskID().toString()), job.getOutputKeyComparator(), reporter, this.spilledRecordsCounter, null) : this.reduceCopier.createKVIterator(job, rfs, reporter);
        this.mapOutputFilesOnDisk.clear();
        this.sortPhase.complete();
        this.setPhase(TaskStatus.Phase.REDUCE);
        this.statusUpdate(umbilical);
        Class<?> keyClass = job.getMapOutputKeyClass();
        Class<?> valueClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getOutputValueGroupingComparator();
        if (useNewApi) {
            this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        } else {
            this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        }
        this.done(umbilical, reporter);
    }

    private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runOldReducer(JobConf job, TaskUmbilicalProtocol umbilical, final Task.TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException {
        Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
        String finalName = ReduceTask.getOutputName(this.getPartition());
        FileSystem fs = FileSystem.get(job);
        final org.apache.hadoop.mapred.RecordWriter out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
        OutputCollector collector = new OutputCollector<OUTKEY, OUTVALUE>(){

            @Override
            public void collect(OUTKEY key, OUTVALUE value) throws IOException {
                out.write(key, value);
                ReduceTask.this.reduceOutputCounter.increment(1L);
                reporter.progress();
            }
        };
        try {
            boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job) > 0L && SkipBadRecords.getAutoIncrReducerProcCount(job);
            ReduceValuesIterator values = this.isSkipping() ? new SkippingReduceValuesIterator<INKEY, INVALUE>(rIter, comparator, keyClass, valueClass, job, reporter, umbilical) : new ReduceValuesIterator<INKEY, INVALUE>(rIter, job.getOutputValueGroupingComparator(), keyClass, valueClass, job, reporter);
            values.informReduceProgress();
            while (values.more()) {
                this.reduceInputKeyCounter.increment(1L);
                reducer.reduce(values.getKey(), values, collector, reporter);
                if (incrProcCount) {
                    reporter.incrCounter("SkippingTaskCounters", "ReduceProcessedGroups", 1L);
                }
                values.nextKey();
                values.informReduceProgress();
            }
            reducer.close();
            out.close(reporter);
        }
        catch (IOException ioe) {
            try {
                reducer.close();
            }
            catch (IOException ignored) {
                // empty catch block
            }
            try {
                out.close(reporter);
            }
            catch (IOException ignored) {
                // empty catch block
            }
            throw ioe;
        }
    }

    private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final Task.TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException, ClassNotFoundException {
        final RawKeyValueIterator rawIter = rIter;
        rIter = new RawKeyValueIterator(){

            @Override
            public void close() throws IOException {
                rawIter.close();
            }

            @Override
            public DataInputBuffer getKey() throws IOException {
                return rawIter.getKey();
            }

            @Override
            public Progress getProgress() {
                return rawIter.getProgress();
            }

            @Override
            public DataInputBuffer getValue() throws IOException {
                return rawIter.getValue();
            }

            @Override
            public boolean next() throws IOException {
                boolean ret = rawIter.next();
                ReduceTask.this.reducePhase.set(rawIter.getProgress().get());
                reporter.progress();
                return ret;
            }
        };
        TaskAttemptContext taskContext = new TaskAttemptContext((Configuration)job, this.getTaskID());
        org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?> reducer = ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
        RecordWriter output = this.outputFormat.getRecordWriter(taskContext);
        NewTrackingRecordWriter trackedRW = new NewTrackingRecordWriter(output, this.reduceOutputCounter);
        job.setBoolean("mapred.skip.on", this.isSkipping());
        Reducer.Context reducerContext = ReduceTask.createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass);
        reducer.run(reducerContext);
        output.close(reducerContext);
    }

    private static int getClosestPowerOf2(int value) {
        if (value <= 0) {
            throw new IllegalArgumentException("Undefined for " + value);
        }
        int hob = Integer.highestOneBit(value);
        return Integer.numberOfTrailingZeros(hob) + ((hob >>> 1 & value) == 0 ? 0 : 1);
    }

    static {
        WritableFactories.setFactory(ReduceTask.class, new WritableFactory(){

            @Override
            public Writable newInstance() {
                return new ReduceTask();
            }
        });
        LOG = LogFactory.getLog((String)ReduceTask.class.getName());
    }

    class ReduceCopier<K, V>
    implements MRConstants {
        private TaskUmbilicalProtocol umbilical;
        private final Task.TaskReporter reporter;
        private static final int STALLED_COPY_TIMEOUT = 180000;
        private static final int MAX_EVENTS_TO_FETCH = 10000;
        private ReduceTask reduceTask;
        private List<MapOutputLocation> scheduledCopies;
        private List<CopyResult> copyResults;
        private int numCopiers;
        private int maxInFlight;
        private Map<String, Long> penaltyBox;
        private Set<String> uniqueHosts;
        private ShuffleRamManager ramManager;
        private FileSystem localFileSys;
        private FileSystem rfs;
        private int ioSortFactor;
        private volatile Throwable mergeThrowable;
        private volatile boolean exitLocalFSMerge = false;
        private volatile boolean exitGetMapEvents = false;
        private final int maxInMemOutputs;
        private final float maxInMemCopyPer;
        private final long maxInMemReduce;
        private List<MapOutputCopier> copiers = null;
        private ShuffleClientMetrics shuffleClientMetrics = null;
        private static final long MIN_POLL_INTERVAL = 1000L;
        private List<MapOutputLocation> retryFetches = new ArrayList<MapOutputLocation>();
        private Set<TaskID> copiedMapOutputs = Collections.synchronizedSet(new TreeSet());
        private Set<TaskAttemptID> obsoleteMapIds = Collections.synchronizedSet(new TreeSet());
        private Random random = null;
        private int maxMapRuntime;
        private int maxFetchFailuresBeforeReporting;
        private final int abortFailureLimit;
        private static final long INITIAL_PENALTY = 10000L;
        private static final float PENALTY_GROWTH_RATE = 1.3f;
        private static final int REPORT_FAILURE_LIMIT = 10;
        private Task.CombinerRunner combinerRunner;
        private Task.CombineOutputCollector combineCollector = null;
        private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
        private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
        private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
        private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
        private static final float MIN_PENDING_MAPS_PERCENT = 0.25f;
        private int maxFailedUniqueFetches = 5;
        Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>();
        Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap = new HashMap<TaskAttemptID, Integer>();
        private static final int BACKOFF_INIT = 4000;
        private static final int MIN_LOG_TIME = 60000;
        private final List<MapOutput> mapOutputsFilesInMemory = Collections.synchronizedList(new LinkedList());
        private final Map<String, List<MapOutputLocation>> mapLocations = new ConcurrentHashMap<String, List<MapOutputLocation>>();
        private int nextMapOutputCopierId = 0;
        private boolean reportReadErrorImmediately;

        private void configureClasspath(JobConf conf) throws IOException {
            ReduceTask task = ReduceTask.this;
            ClassLoader parent = conf.getClassLoader();
            File workDir = new File(task.getJobFile()).getParentFile();
            ArrayList<URL> urllist = new ArrayList<URL>();
            String jar = conf.getJar();
            if (jar != null) {
                File jobCacheDir = new File(new Path(jar).getParent().toString());
                File[] libs = new File(jobCacheDir, "lib").listFiles();
                if (libs != null) {
                    for (int i = 0; i < libs.length; ++i) {
                        urllist.add(libs[i].toURL());
                    }
                }
                urllist.add(new File(jobCacheDir, "classes").toURL());
                urllist.add(jobCacheDir.toURL());
            }
            urllist.add(workDir.toURL());
            URL[] urls = urllist.toArray(new URL[urllist.size()]);
            URLClassLoader loader = new URLClassLoader(urls, parent);
            conf.setClassLoader(loader);
        }

        public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf, Task.TaskReporter reporter) throws ClassNotFoundException, IOException {
            this.configureClasspath(conf);
            this.reporter = reporter;
            this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
            this.umbilical = umbilical;
            this.reduceTask = ReduceTask.this;
            this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
            this.copyResults = new ArrayList<CopyResult>(100);
            this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
            this.maxInFlight = 4 * this.numCopiers;
            Counter combineInputCounter = reporter.getCounter((Enum)Task.Counter.COMBINE_INPUT_RECORDS);
            this.combinerRunner = Task.CombinerRunner.create(conf, ReduceTask.this.getTaskID(), (Counters.Counter)combineInputCounter, reporter, null);
            if (this.combinerRunner != null) {
                this.combineCollector = new Task.CombineOutputCollector(ReduceTask.this.reduceCombineOutputCounter);
            }
            this.ioSortFactor = conf.getInt("io.sort.factor", 10);
            this.abortFailureLimit = Math.max(30, ReduceTask.this.numMaps / 10);
            this.maxFetchFailuresBeforeReporting = conf.getInt("mapreduce.reduce.shuffle.maxfetchfailures", 10);
            this.maxFailedUniqueFetches = Math.min(ReduceTask.this.numMaps, this.maxFailedUniqueFetches);
            this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
            this.maxInMemCopyPer = conf.getFloat("mapred.job.shuffle.merge.percent", 0.66f);
            float maxRedPer = conf.getFloat("mapred.job.reduce.input.buffer.percent", 0.0f);
            if ((double)maxRedPer > 1.0 || (double)maxRedPer < 0.0) {
                throw new IOException("mapred.job.reduce.input.buffer.percent" + maxRedPer);
            }
            this.maxInMemReduce = (int)Math.min((float)Runtime.getRuntime().maxMemory() * maxRedPer, 2.1474836E9f);
            this.ramManager = new ShuffleRamManager(conf);
            this.localFileSys = FileSystem.getLocal(conf);
            this.rfs = ((LocalFileSystem)this.localFileSys).getRaw();
            this.penaltyBox = new LinkedHashMap<String, Long>();
            this.uniqueHosts = new HashSet<String>();
            long randomSeed = System.nanoTime() + (long)Math.pow(this.reduceTask.getPartition(), this.reduceTask.getPartition() % 10);
            this.random = new Random(randomSeed);
            this.maxMapRuntime = 0;
            this.reportReadErrorImmediately = conf.getBoolean("mapreduce.reduce.shuffle.notify.readerror", true);
        }

        private boolean busyEnough(int numInFlight) {
            return numInFlight > this.maxInFlight;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean fetchOutputs() throws IOException {
            long startTime;
            int i;
            int totalFailures = 0;
            int numInFlight = 0;
            int numCopied = 0;
            DecimalFormat mbpsFormat = new DecimalFormat("0.00");
            Progress copyPhase = this.reduceTask.getProgress().phase();
            LocalFSMerger localFSMergerThread = null;
            InMemFSMergeThread inMemFSMergeThread = null;
            GetMapEventsThread getMapEventsThread = null;
            for (i = 0; i < ReduceTask.this.numMaps; ++i) {
                copyPhase.addPhase();
            }
            this.copiers = new ArrayList<MapOutputCopier>(this.numCopiers);
            for (i = 0; i < this.numCopiers; ++i) {
                MapOutputCopier copier = new MapOutputCopier(ReduceTask.this.conf, this.reporter, this.reduceTask.getJobTokenSecret());
                this.copiers.add(copier);
                copier.start();
            }
            localFSMergerThread = new LocalFSMerger((LocalFileSystem)this.localFileSys);
            inMemFSMergeThread = new InMemFSMergeThread();
            localFSMergerThread.start();
            inMemFSMergeThread.start();
            getMapEventsThread = new GetMapEventsThread();
            getMapEventsThread.start();
            long currentTime = startTime = System.currentTimeMillis();
            long lastProgressTime = startTime;
            long lastOutputTime = 0L;
            block23: while (this.copiedMapOutputs.size() < ReduceTask.this.numMaps && this.mergeThrowable == null) {
                currentTime = System.currentTimeMillis();
                boolean logNow = false;
                if (currentTime - lastOutputTime > 60000L) {
                    lastOutputTime = currentTime;
                    logNow = true;
                }
                if (logNow) {
                    LOG.info((Object)(this.reduceTask.getTaskID() + " Need another " + (ReduceTask.this.numMaps - this.copiedMapOutputs.size()) + " map output(s) " + "where " + numInFlight + " is already in progress"));
                }
                for (MapOutputLocation loc : this.retryFetches) {
                    List<MapOutputLocation> locList = this.mapLocations.get(loc.getHost());
                    if (locList == null) continue;
                    locList.add(0, loc);
                }
                if (this.retryFetches.size() > 0) {
                    LOG.info((Object)(this.reduceTask.getTaskID() + ": " + "Got " + this.retryFetches.size() + " map-outputs from previous failures"));
                }
                this.retryFetches.clear();
                int numScheduled = 0;
                int numDups = 0;
                List<MapOutputLocation> list = this.scheduledCopies;
                synchronized (list) {
                    ArrayList<String> hostList = new ArrayList<String>();
                    hostList.addAll(this.mapLocations.keySet());
                    Collections.shuffle(hostList, this.random);
                    for (String host : hostList) {
                        List<MapOutputLocation> knownOutputsByLoc = this.mapLocations.get(host);
                        if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) continue;
                        if (this.uniqueHosts.contains(host)) {
                            numDups += knownOutputsByLoc.size();
                            continue;
                        }
                        Long penaltyEnd = this.penaltyBox.get(host);
                        boolean penalized = false;
                        if (penaltyEnd != null) {
                            if (currentTime < penaltyEnd) {
                                penalized = true;
                            } else {
                                this.penaltyBox.remove(host);
                            }
                        }
                        if (penalized) continue;
                        List<MapOutputLocation> list2 = knownOutputsByLoc;
                        synchronized (list2) {
                            Iterator<MapOutputLocation> locItr = knownOutputsByLoc.iterator();
                            while (locItr.hasNext()) {
                                MapOutputLocation loc = locItr.next();
                                if (this.obsoleteMapIds.contains(loc.getTaskAttemptId())) {
                                    locItr.remove();
                                    continue;
                                }
                                this.uniqueHosts.add(host);
                                this.scheduledCopies.add(loc);
                                locItr.remove();
                                ++numInFlight;
                                ++numScheduled;
                                break;
                            }
                        }
                    }
                    this.scheduledCopies.notifyAll();
                }
                if (numScheduled > 0 || logNow) {
                    LOG.info((Object)(this.reduceTask.getTaskID() + " Scheduled " + numScheduled + " outputs (" + this.penaltyBox.size() + " slow hosts and" + numDups + " dup hosts)"));
                }
                if (this.penaltyBox.size() > 0 && logNow) {
                    LOG.info((Object)"Penalized(slow) Hosts: ");
                    for (String host : this.penaltyBox.keySet()) {
                        LOG.info((Object)(host + " Will be considered after: " + (this.penaltyBox.get(host) - currentTime) / 1000L + " seconds."));
                    }
                }
                try {
                    if (numInFlight == 0 && numScheduled == 0) {
                        this.reporter.progress();
                        Thread.sleep(5000L);
                    }
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
                while (numInFlight > 0 && this.mergeThrowable == null) {
                    LOG.debug((Object)(this.reduceTask.getTaskID() + " numInFlight = " + numInFlight));
                    CopyResult cr = this.getCopyResult(numInFlight);
                    if (cr == null) continue block23;
                    if (cr.getSuccess()) {
                        ++numCopied;
                        lastProgressTime = System.currentTimeMillis();
                        ReduceTask.this.reduceShuffleBytes.increment(cr.getSize());
                        long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000L + 1L;
                        float mbs = (float)ReduceTask.this.reduceShuffleBytes.getCounter() / 1048576.0f;
                        float transferRate = mbs / (float)secsSinceStart;
                        copyPhase.startNextPhase();
                        copyPhase.setStatus("copy (" + numCopied + " of " + ReduceTask.this.numMaps + " at " + mbpsFormat.format(transferRate) + " MB/s)");
                        this.fetchFailedMaps.remove(cr.getLocation().getTaskId());
                    } else if (cr.isObsolete()) {
                        LOG.info((Object)(this.reduceTask.getTaskID() + " Ignoring obsolete copy result for Map Task: " + cr.getLocation().getTaskAttemptId() + " from host: " + cr.getHost()));
                    } else {
                        this.retryFetches.add(cr.getLocation());
                        TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
                        TaskID mapId = cr.getLocation().getTaskId();
                        ++totalFailures;
                        Integer noFailedFetches = this.mapTaskToFailedFetchesMap.get(mapTaskId);
                        noFailedFetches = noFailedFetches == null ? 1 : noFailedFetches + 1;
                        this.mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
                        LOG.info((Object)("Task " + ReduceTask.this.getTaskID() + ": Failed fetch #" + noFailedFetches + " from " + mapTaskId));
                        if (noFailedFetches >= this.abortFailureLimit) {
                            LOG.fatal((Object)(noFailedFetches + " failures downloading " + ReduceTask.this.getTaskID() + "."));
                            this.umbilical.shuffleError(ReduceTask.this.getTaskID(), "Exceeded the abort failure limit; bailing-out.", ReduceTask.this.jvmContext);
                        }
                        this.checkAndInformJobTracker(noFailedFetches, mapTaskId, cr.getError().equals((Object)CopyOutputErrorType.READ_ERROR));
                        if (noFailedFetches == this.maxFetchFailuresBeforeReporting) {
                            boolean reducerStalled;
                            this.fetchFailedMaps.add(mapId);
                            boolean reducerHealthy = (float)totalFailures / (float)(totalFailures + numCopied) < 0.5f;
                            boolean reducerProgressedEnough = (float)numCopied / (float)ReduceTask.this.numMaps >= 0.5f;
                            int stallDuration = (int)(System.currentTimeMillis() - lastProgressTime);
                            int shuffleProgressDuration = (int)(lastProgressTime - startTime);
                            int minShuffleRunDuration = shuffleProgressDuration > this.maxMapRuntime ? shuffleProgressDuration : this.maxMapRuntime;
                            boolean bl = reducerStalled = (float)stallDuration / (float)minShuffleRunDuration >= 0.5f;
                            if (!(this.fetchFailedMaps.size() < this.maxFailedUniqueFetches && this.fetchFailedMaps.size() != ReduceTask.this.numMaps - this.copiedMapOutputs.size() || reducerHealthy || reducerProgressedEnough && !reducerStalled)) {
                                LOG.fatal((Object)("Shuffle failed with too many fetch failures and insufficient progress!Killing task " + ReduceTask.this.getTaskID() + "."));
                                this.umbilical.shuffleError(ReduceTask.this.getTaskID(), "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.", ReduceTask.this.jvmContext);
                            }
                        }
                        currentTime = System.currentTimeMillis();
                        long currentBackOff = (long)(10000.0 * Math.pow(1.3f, noFailedFetches.intValue()));
                        this.penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
                        LOG.warn((Object)(this.reduceTask.getTaskID() + " adding host " + cr.getHost() + " to penalty box, next contact in " + currentBackOff / 1000L + " seconds"));
                    }
                    this.uniqueHosts.remove(cr.getHost());
                    --numInFlight;
                }
            }
            this.exitGetMapEvents = true;
            try {
                getMapEventsThread.join();
                LOG.info((Object)"getMapsEventsThread joined.");
            }
            catch (InterruptedException ie) {
                LOG.info((Object)("getMapsEventsThread threw an exception: " + StringUtils.stringifyException(ie)));
            }
            SequencedCollection<MapOutputCopier> ie = this.copiers;
            synchronized (ie) {
                List<MapOutputLocation> list = this.scheduledCopies;
                synchronized (list) {
                    for (MapOutputCopier copier : this.copiers) {
                        copier.interrupt();
                    }
                    this.copiers.clear();
                }
            }
            ie = ReduceTask.this.mapOutputFilesOnDisk;
            synchronized (ie) {
                this.exitLocalFSMerge = true;
                ReduceTask.this.mapOutputFilesOnDisk.notify();
            }
            this.ramManager.close();
            if (this.mergeThrowable == null) {
                try {
                    localFSMergerThread.join();
                    LOG.info((Object)("Interleaved on-disk merge complete: " + ReduceTask.this.mapOutputFilesOnDisk.size() + " files left."));
                    inMemFSMergeThread.join();
                    LOG.info((Object)("In-memory merge complete: " + this.mapOutputsFilesInMemory.size() + " files left."));
                }
                catch (InterruptedException ie2) {
                    LOG.warn((Object)(this.reduceTask.getTaskID() + " Final merge of the inmemory files threw an exception: " + StringUtils.stringifyException(ie2)));
                    if (this.mergeThrowable != null) {
                        this.mergeThrowable = ie2;
                    }
                    return false;
                }
            }
            return this.mergeThrowable == null && this.copiedMapOutputs.size() == ReduceTask.this.numMaps;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void checkAndInformJobTracker(int failures, TaskAttemptID mapId, boolean readError) {
            if (this.reportReadErrorImmediately && readError || failures % this.maxFetchFailuresBeforeReporting == 0) {
                ReduceTask reduceTask = ReduceTask.this;
                synchronized (reduceTask) {
                    ReduceTask.this.taskStatus.addFetchFailedMap(mapId);
                    this.reporter.progress();
                    LOG.info((Object)("Failed to fetch map-output from " + mapId + " even after MAX_FETCH_RETRIES_PER_MAP retries... " + " or it is a read error, " + " reporting to the JobTracker"));
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long createInMemorySegments(List<Merger.Segment<K, V>> inMemorySegments, long leaveBytes) throws IOException {
            long totalSize = 0L;
            List<MapOutput> list = this.mapOutputsFilesInMemory;
            synchronized (list) {
                long fullSize = 0L;
                for (MapOutput mo : this.mapOutputsFilesInMemory) {
                    fullSize += (long)mo.data.length;
                }
                while (fullSize > leaveBytes) {
                    MapOutput mo = this.mapOutputsFilesInMemory.remove(0);
                    totalSize += (long)mo.data.length;
                    fullSize -= (long)mo.data.length;
                    IFile.InMemoryReader reader = new IFile.InMemoryReader(this.ramManager, mo.mapAttemptId, mo.data, 0, mo.data.length);
                    Merger.Segment segment = new Merger.Segment(reader, true);
                    inMemorySegments.add(segment);
                }
            }
            return totalSize;
        }

        private RawKeyValueIterator createKVIterator(JobConf job, FileSystem fs, Reporter reporter) throws IOException {
            Path[] onDisk;
            Class<?> keyClass = job.getMapOutputKeyClass();
            Class<?> valueClass = job.getMapOutputValueClass();
            boolean keepInputs = job.getKeepFailedTaskFiles();
            Path tmpDir = new Path(ReduceTask.this.getTaskID().toString());
            RawComparator comparator = job.getOutputKeyComparator();
            ArrayList<Merger.Segment<K, V>> memDiskSegments = new ArrayList<Merger.Segment<K, V>>();
            long inMemToDiskBytes = 0L;
            if (this.mapOutputsFilesInMemory.size() > 0) {
                TaskID mapId = this.mapOutputsFilesInMemory.get((int)0).mapId;
                inMemToDiskBytes = this.createInMemorySegments(memDiskSegments, this.maxInMemReduce);
                int numMemDiskSegments = memDiskSegments.size();
                if (numMemDiskSegments > 0 && this.ioSortFactor > ReduceTask.this.mapOutputFilesOnDisk.size()) {
                    Path outputPath = ReduceTask.this.mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
                    RawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, ReduceTask.this.spilledRecordsCounter, null);
                    IFile.Writer writer = new IFile.Writer(job, fs, outputPath, keyClass, valueClass, ReduceTask.this.codec, null);
                    try {
                        Merger.writeFile(rIter, writer, reporter, job);
                        this.addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
                    }
                    catch (Exception e) {
                        if (null != outputPath) {
                            fs.delete(outputPath, true);
                        }
                        throw new IOException("Final merge failed", e);
                    }
                    finally {
                        if (null != writer) {
                            writer.close();
                        }
                    }
                    LOG.info((Object)("Merged " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes to disk to satisfy " + "reduce memory limit"));
                    inMemToDiskBytes = 0L;
                    memDiskSegments.clear();
                } else if (inMemToDiskBytes != 0L) {
                    LOG.info((Object)("Keeping " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes in memory for " + "intermediate, on-disk merge"));
                }
            }
            ArrayList diskSegments = new ArrayList();
            long onDiskBytes = inMemToDiskBytes;
            for (Path file : onDisk = ReduceTask.this.getMapFiles(fs, false)) {
                onDiskBytes += fs.getFileStatus(file).getLen();
                diskSegments.add(new Merger.Segment(job, fs, file, ReduceTask.this.codec, keepInputs));
            }
            LOG.info((Object)("Merging " + onDisk.length + " files, " + onDiskBytes + " bytes from disk"));
            Collections.sort(diskSegments, new Comparator<Merger.Segment<K, V>>(){

                @Override
                public int compare(Merger.Segment<K, V> o1, Merger.Segment<K, V> o2) {
                    if (o1.getLength() == o2.getLength()) {
                        return 0;
                    }
                    return o1.getLength() < o2.getLength() ? -1 : 1;
                }
            });
            ArrayList<Merger.Segment<K, V>> finalSegments = new ArrayList<Merger.Segment<K, V>>();
            long inMemBytes = this.createInMemorySegments(finalSegments, 0L);
            LOG.info((Object)("Merging " + finalSegments.size() + " segments, " + inMemBytes + " bytes from memory into reduce"));
            if (0L != onDiskBytes) {
                int numInMemSegments = memDiskSegments.size();
                diskSegments.addAll(0, memDiskSegments);
                memDiskSegments.clear();
                RawKeyValueIterator diskMerge = Merger.merge(job, fs, keyClass, valueClass, ReduceTask.this.codec, diskSegments, this.ioSortFactor, numInMemSegments, tmpDir, comparator, reporter, false, ReduceTask.this.spilledRecordsCounter, null);
                diskSegments.clear();
                if (0 == finalSegments.size()) {
                    return diskMerge;
                }
                finalSegments.add(new Merger.Segment(new RawKVIteratorReader(diskMerge, onDiskBytes), true));
            }
            return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, reporter, ReduceTask.this.spilledRecordsCounter, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CopyResult getCopyResult(int numInFlight) {
            List<CopyResult> list = this.copyResults;
            synchronized (list) {
                while (this.copyResults.isEmpty()) {
                    try {
                        if (this.busyEnough(numInFlight)) {
                            this.copyResults.wait();
                            continue;
                        }
                        return null;
                    }
                    catch (InterruptedException interruptedException) {
                    }
                }
                return this.copyResults.remove(0);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void addToMapOutputFilesOnDisk(FileStatus status) {
            SortedSet sortedSet = ReduceTask.this.mapOutputFilesOnDisk;
            synchronized (sortedSet) {
                ReduceTask.this.mapOutputFilesOnDisk.add(status);
                ReduceTask.this.mapOutputFilesOnDisk.notify();
            }
        }

        static /* synthetic */ int access$908(ReduceCopier x0) {
            return x0.nextMapOutputCopierId++;
        }

        static /* synthetic */ List access$1200(ReduceCopier x0) {
            return x0.scheduledCopies;
        }

        private class GetMapEventsThread
        extends Thread {
            private IntWritable fromEventId = new IntWritable(0);
            private static final long SLEEP_TIME = 1000L;

            public GetMapEventsThread() {
                this.setName("Thread for polling Map Completion Events");
                this.setDaemon(true);
            }

            @Override
            public void run() {
                LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskID() + " Thread started: " + this.getName()));
                do {
                    try {
                        int numNewMaps = this.getMapCompletionEvents();
                        if (LOG.isDebugEnabled() && numNewMaps > 0) {
                            LOG.debug((Object)(ReduceCopier.this.reduceTask.getTaskID() + ": " + "Got " + numNewMaps + " new map-outputs"));
                        }
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        LOG.warn((Object)(ReduceCopier.this.reduceTask.getTaskID() + " GetMapEventsThread returning after an " + " interrupted exception"));
                        return;
                    }
                    catch (Throwable t) {
                        String msg = ReduceCopier.this.reduceTask.getTaskID() + " GetMapEventsThread Ignoring exception : " + StringUtils.stringifyException(t);
                        ReduceTask.this.reportFatalError(ReduceTask.this.getTaskID(), t, msg);
                    }
                } while (!ReduceCopier.this.exitGetMapEvents);
                LOG.info((Object)"GetMapEventsThread exiting");
            }

            private int getMapCompletionEvents() throws IOException {
                int numNewMaps = 0;
                MapTaskCompletionEventsUpdate update = ReduceCopier.this.umbilical.getMapCompletionEvents(ReduceCopier.this.reduceTask.getJobID(), this.fromEventId.get(), 10000, ReduceCopier.this.reduceTask.getTaskID(), ReduceTask.this.jvmContext);
                TaskCompletionEvent[] events = update.getMapTaskCompletionEvents();
                if (update.shouldReset()) {
                    this.fromEventId.set(0);
                    ReduceCopier.this.obsoleteMapIds.clear();
                    ReduceCopier.this.mapLocations.clear();
                }
                this.fromEventId.set(this.fromEventId.get() + events.length);
                block5: for (TaskCompletionEvent event : events) {
                    switch (event.getTaskStatus()) {
                        case SUCCEEDED: {
                            URI u = URI.create(event.getTaskTrackerHttp());
                            String host = u.getHost();
                            TaskAttemptID taskId = event.getTaskAttemptId();
                            URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + "/mapOutput?job=" + taskId.getJobID() + "&map=" + taskId + "&reduce=" + ReduceTask.this.getPartition());
                            List<MapOutputLocation> loc = (List<MapOutputLocation>)ReduceCopier.this.mapLocations.get(host);
                            if (loc == null) {
                                loc = Collections.synchronizedList(new LinkedList());
                                ReduceCopier.this.mapLocations.put(host, loc);
                            }
                            loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
                            ++numNewMaps;
                            continue block5;
                        }
                        case FAILED: 
                        case KILLED: 
                        case OBSOLETE: {
                            ReduceCopier.this.obsoleteMapIds.add(event.getTaskAttemptId());
                            LOG.info((Object)("Ignoring obsolete output of " + (Object)((Object)event.getTaskStatus()) + " map-task: '" + event.getTaskAttemptId() + "'"));
                            continue block5;
                        }
                        case TIPFAILED: {
                            ReduceCopier.this.copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
                            LOG.info((Object)("Ignoring output of failed map TIP: '" + event.getTaskAttemptId() + "'"));
                        }
                    }
                }
                return numNewMaps;
            }
        }

        private class InMemFSMergeThread
        extends Thread {
            public InMemFSMergeThread() {
                this.setName("Thread for merging in memory files");
                this.setDaemon(true);
            }

            @Override
            public void run() {
                LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskID() + " Thread started: " + this.getName()));
                try {
                    boolean exit = false;
                    do {
                        if (exit = ReduceCopier.this.ramManager.waitForDataToMerge()) continue;
                        this.doInMemMerge();
                    } while (!exit);
                }
                catch (Exception e) {
                    LOG.warn((Object)(ReduceCopier.this.reduceTask.getTaskID() + " Merge of the inmemory files threw an exception: " + StringUtils.stringifyException(e)));
                    ReduceCopier.this.mergeThrowable = e;
                }
                catch (Throwable t) {
                    String msg = ReduceTask.this.getTaskID() + " : Failed to merge in memory" + StringUtils.stringifyException(t);
                    ReduceTask.this.reportFatalError(ReduceTask.this.getTaskID(), t, msg);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void doInMemMerge() throws IOException {
                if (ReduceCopier.this.mapOutputsFilesInMemory.size() == 0) {
                    return;
                }
                TaskID mapId = ((MapOutput)((ReduceCopier)ReduceCopier.this).mapOutputsFilesInMemory.get((int)0)).mapId;
                ArrayList inMemorySegments = new ArrayList();
                long mergeOutputSize = ReduceCopier.this.createInMemorySegments(inMemorySegments, 0L);
                int noInMemorySegments = inMemorySegments.size();
                Path outputPath = ReduceTask.this.mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
                IFile.Writer writer = new IFile.Writer(ReduceTask.this.conf, ReduceCopier.this.rfs, outputPath, ReduceTask.this.conf.getMapOutputKeyClass(), ReduceTask.this.conf.getMapOutputValueClass(), ReduceTask.this.codec, null);
                RawKeyValueIterator rIter = null;
                try {
                    LOG.info((Object)("Initiating in-memory merge with " + noInMemorySegments + " segments..."));
                    rIter = Merger.merge(ReduceTask.this.conf, ReduceCopier.this.rfs, ReduceTask.this.conf.getMapOutputKeyClass(), ReduceTask.this.conf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(ReduceCopier.this.reduceTask.getTaskID().toString()), ReduceTask.this.conf.getOutputKeyComparator(), ReduceCopier.this.reporter, ReduceTask.this.spilledRecordsCounter, null);
                    if (ReduceCopier.this.combinerRunner == null) {
                        Merger.writeFile(rIter, writer, ReduceCopier.this.reporter, ReduceTask.this.conf);
                    } else {
                        ReduceCopier.this.combineCollector.setWriter(writer);
                        ReduceCopier.this.combinerRunner.combine(rIter, ReduceCopier.this.combineCollector);
                    }
                    writer.close();
                    LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskID() + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + outputPath + " of size " + ReduceCopier.this.localFileSys.getFileStatus(outputPath).getLen()));
                }
                catch (Exception e) {
                    ReduceCopier.this.localFileSys.delete(outputPath, true);
                    throw (IOException)new IOException("Intermediate merge failed").initCause(e);
                }
                FileStatus status = ReduceCopier.this.localFileSys.getFileStatus(outputPath);
                SortedSet sortedSet = ReduceTask.this.mapOutputFilesOnDisk;
                synchronized (sortedSet) {
                    ReduceCopier.this.addToMapOutputFilesOnDisk(status);
                }
            }
        }

        private class LocalFSMerger
        extends Thread {
            private LocalFileSystem localFileSys;

            public LocalFSMerger(LocalFileSystem fs) {
                this.localFileSys = fs;
                this.setName("Thread for merging on-disk files");
                this.setDaemon(true);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskID() + " Thread started: " + this.getName()));
                    while (!ReduceCopier.this.exitLocalFSMerge) {
                        SortedSet sortedSet = ReduceTask.this.mapOutputFilesOnDisk;
                        synchronized (sortedSet) {
                            while (!ReduceCopier.this.exitLocalFSMerge && ReduceTask.this.mapOutputFilesOnDisk.size() < 2 * ReduceCopier.this.ioSortFactor - 1) {
                                LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskID() + " Thread waiting: " + this.getName()));
                                ReduceTask.this.mapOutputFilesOnDisk.wait();
                            }
                        }
                        if (ReduceCopier.this.exitLocalFSMerge) break;
                        ArrayList<Path> mapFiles = new ArrayList<Path>();
                        long approxOutputSize = 0L;
                        int bytesPerSum = ReduceCopier.this.reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
                        LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskID() + "We have  " + ReduceTask.this.mapOutputFilesOnDisk.size() + " map outputs on disk. " + "Triggering merge of " + ReduceCopier.this.ioSortFactor + " files"));
                        SortedSet sortedSet2 = ReduceTask.this.mapOutputFilesOnDisk;
                        synchronized (sortedSet2) {
                            for (int i = 0; i < ReduceCopier.this.ioSortFactor; ++i) {
                                FileStatus filestatus = (FileStatus)ReduceTask.this.mapOutputFilesOnDisk.first();
                                ReduceTask.this.mapOutputFilesOnDisk.remove(filestatus);
                                mapFiles.add(filestatus.getPath());
                                approxOutputSize += filestatus.getLen();
                            }
                        }
                        if (mapFiles.size() == 0) {
                            return;
                        }
                        approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
                        Path outputPath = ReduceTask.this.lDirAlloc.getLocalPathForWrite(((Path)mapFiles.get(0)).toString(), approxOutputSize, ReduceTask.this.conf).suffix(".merged");
                        IFile.Writer writer = new IFile.Writer(ReduceTask.this.conf, ReduceCopier.this.rfs, outputPath, ReduceTask.this.conf.getMapOutputKeyClass(), ReduceTask.this.conf.getMapOutputValueClass(), ReduceTask.this.codec, null);
                        RawKeyValueIterator iter = null;
                        Path tmpDir = new Path(ReduceCopier.this.reduceTask.getTaskID().toString());
                        try {
                            iter = Merger.merge((Configuration)ReduceTask.this.conf, ReduceCopier.this.rfs, ReduceTask.this.conf.getMapOutputKeyClass(), ReduceTask.this.conf.getMapOutputValueClass(), ReduceTask.this.codec, mapFiles.toArray(new Path[mapFiles.size()]), true, ReduceCopier.this.ioSortFactor, tmpDir, ReduceTask.this.conf.getOutputKeyComparator(), ReduceCopier.this.reporter, ReduceTask.this.spilledRecordsCounter, null);
                            Merger.writeFile(iter, writer, ReduceCopier.this.reporter, ReduceTask.this.conf);
                            writer.close();
                        }
                        catch (Exception e) {
                            this.localFileSys.delete(outputPath, true);
                            throw new IOException(StringUtils.stringifyException(e));
                        }
                        SortedSet sortedSet3 = ReduceTask.this.mapOutputFilesOnDisk;
                        synchronized (sortedSet3) {
                            ReduceCopier.this.addToMapOutputFilesOnDisk(this.localFileSys.getFileStatus(outputPath));
                        }
                        LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskID() + " Finished merging " + mapFiles.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + this.localFileSys.getFileStatus(outputPath).getLen()));
                    }
                }
                catch (Exception e) {
                    LOG.warn((Object)(ReduceCopier.this.reduceTask.getTaskID() + " Merging of the local FS files threw an exception: " + StringUtils.stringifyException(e)));
                    if (ReduceCopier.this.mergeThrowable == null) {
                        ReduceCopier.this.mergeThrowable = e;
                    }
                }
                catch (Throwable t) {
                    String msg = ReduceTask.this.getTaskID() + " : Failed to merge on the local FS" + StringUtils.stringifyException(t);
                    ReduceTask.this.reportFatalError(ReduceTask.this.getTaskID(), t, msg);
                }
            }
        }

        class RawKVIteratorReader
        extends IFile.Reader<K, V> {
            private final RawKeyValueIterator kvIter;

            public RawKVIteratorReader(RawKeyValueIterator kvIter, long size) throws IOException {
                super(null, null, size, null, ReduceTask.this.spilledRecordsCounter);
                this.kvIter = kvIter;
            }

            @Override
            public boolean next(DataInputBuffer key, DataInputBuffer value) throws IOException {
                if (this.kvIter.next()) {
                    DataInputBuffer kb = this.kvIter.getKey();
                    DataInputBuffer vb = this.kvIter.getValue();
                    int kp = kb.getPosition();
                    int klen = kb.getLength() - kp;
                    key.reset(kb.getData(), kp, klen);
                    int vp = vb.getPosition();
                    int vlen = vb.getLength() - vp;
                    value.reset(vb.getData(), vp, vlen);
                    this.bytesRead += (long)(klen + vlen);
                    return true;
                }
                return false;
            }

            @Override
            public long getPosition() throws IOException {
                return this.bytesRead;
            }

            @Override
            public void close() throws IOException {
                this.kvIter.close();
            }
        }

        private class MapOutputCopier
        extends Thread {
            private static final int UNIT_CONNECT_TIMEOUT = 30000;
            private static final int DEFAULT_READ_TIMEOUT = 180000;
            private final int shuffleConnectionTimeout;
            private final int shuffleReadTimeout;
            private MapOutputLocation currentLocation = null;
            private int id = ReduceCopier.access$908(ReduceCopier.this);
            private Reporter reporter;
            private boolean readError = false;
            private CompressionCodec codec = null;
            private Decompressor decompressor = null;
            private final SecretKey jobTokenSecret;

            public MapOutputCopier(JobConf job, Reporter reporter, SecretKey jobTokenSecret) {
                this.setName("MapOutputCopier " + ReduceCopier.this.reduceTask.getTaskID() + "." + this.id);
                LOG.debug((Object)(this.getName() + " created"));
                this.reporter = reporter;
                this.jobTokenSecret = jobTokenSecret;
                this.shuffleConnectionTimeout = job.getInt("mapreduce.reduce.shuffle.connect.timeout", 180000);
                this.shuffleReadTimeout = job.getInt("mapreduce.reduce.shuffle.read.timeout", 180000);
                if (job.getCompressMapOutput()) {
                    Class<? extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class);
                    this.codec = ReflectionUtils.newInstance(codecClass, job);
                    this.decompressor = CodecPool.getDecompressor(this.codec);
                }
            }

            public synchronized boolean fail() {
                if (this.currentLocation != null) {
                    this.finish(-1L, CopyOutputErrorType.OTHER_ERROR);
                    return true;
                }
                return false;
            }

            public synchronized MapOutputLocation getLocation() {
                return this.currentLocation;
            }

            private synchronized void start(MapOutputLocation loc) {
                this.currentLocation = loc;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private synchronized void finish(long size, CopyOutputErrorType error) {
                if (this.currentLocation != null) {
                    LOG.debug((Object)(this.getName() + " finishing " + this.currentLocation + " =" + size));
                    List list = ReduceCopier.this.copyResults;
                    synchronized (list) {
                        ReduceCopier.this.copyResults.add(new CopyResult(this.currentLocation, size, error));
                        ReduceCopier.this.copyResults.notify();
                    }
                    this.currentLocation = null;
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Unable to fully structure code
             */
            @Override
            public void run() {
                while (true) {
                    try {
                        while (true) lbl-1000:
                        // 4 sources

                        {
                            loc = null;
                            size = -1L;
                            var4_8 = ReduceCopier.access$1200(ReduceCopier.this);
                            synchronized (var4_8) {
                                while (ReduceCopier.access$1200(ReduceCopier.this).isEmpty()) {
                                    ReduceCopier.access$1200(ReduceCopier.this).wait();
                                }
                                loc = (MapOutputLocation)ReduceCopier.access$1200(ReduceCopier.this).remove(0);
                            }
                            error = CopyOutputErrorType.OTHER_ERROR;
                            this.readError = false;
                            try {
                                ReduceCopier.access$1300(ReduceCopier.this).threadBusy();
                                this.start(loc);
                                size = this.copyOutput(loc);
                                ReduceCopier.access$1300(ReduceCopier.this).successFetch();
                                error = CopyOutputErrorType.NO_ERROR;
                            }
                            catch (IOException e) {
                                ReduceTask.access$200().warn((Object)(ReduceCopier.access$1000(ReduceCopier.this).getTaskID() + " copy failed: " + loc.getTaskAttemptId() + " from " + loc.getHost()));
                                ReduceTask.access$200().warn((Object)StringUtils.stringifyException(e));
                                ReduceCopier.access$1300(ReduceCopier.this).failedFetch();
                                if (this.readError) {
                                    error = CopyOutputErrorType.READ_ERROR;
                                }
                                size = -1L;
                            }
                            finally {
                                ReduceCopier.access$1300(ReduceCopier.this).threadFree();
                                this.finish(size, error);
                                continue;
                            }
                            break;
                        }
                    }
                    catch (InterruptedException e) {
                        break;
                    }
                    catch (FSError e) {
                        ReduceTask.access$200().error((Object)("Task: " + ReduceCopier.access$1000(ReduceCopier.this).getTaskID() + " - FSError: " + StringUtils.stringifyException(e)));
                        try {
                            ReduceCopier.access$1400(ReduceCopier.this).fsError(ReduceCopier.access$1000(ReduceCopier.this).getTaskID(), e.getMessage(), ReduceTask.this.jvmContext);
                        }
                        catch (IOException io) {
                            ReduceTask.access$200().error((Object)("Could not notify TT of FSError: " + StringUtils.stringifyException(io)));
                        }
                        continue;
                    }
                    catch (Throwable th) {
                        msg = ReduceTask.this.getTaskID() + " : Map output copy failure : " + StringUtils.stringifyException(th);
                        ReduceTask.this.reportFatalError(ReduceTask.this.getTaskID(), th, msg);
                        continue;
                    }
                    ** GOTO lbl-1000
                    break;
                }
                if (this.decompressor != null) {
                    CodecPool.returnDecompressor(this.decompressor);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private long copyOutput(MapOutputLocation loc) throws IOException, InterruptedException {
                if (ReduceCopier.this.copiedMapOutputs.contains(loc.getTaskId()) || ReduceCopier.this.obsoleteMapIds.contains(loc.getTaskAttemptId())) {
                    return -2L;
                }
                TaskAttemptID reduceId = ReduceCopier.this.reduceTask.getTaskID();
                Path filename = new Path(String.format("%s/map_%d.out", "output", loc.getTaskId().getId()));
                Path tmpMapOutput = new Path(filename + "-" + this.id);
                MapOutput mapOutput = this.getMapOutput(loc, tmpMapOutput, reduceId.getTaskID().getId());
                if (mapOutput == null) {
                    throw new IOException("Failed to fetch map-output for " + loc.getTaskAttemptId() + " from " + loc.getHost());
                }
                long bytes = mapOutput.compressedSize;
                ReduceTask reduceTask = ReduceTask.this;
                synchronized (reduceTask) {
                    if (ReduceCopier.this.copiedMapOutputs.contains(loc.getTaskId())) {
                        mapOutput.discard();
                        return -2L;
                    }
                    if (bytes == 0L) {
                        try {
                            mapOutput.discard();
                        }
                        catch (IOException ioe) {
                            LOG.info((Object)("Couldn't discard output of " + loc.getTaskId()));
                        }
                        this.noteCopiedMapOutput(loc.getTaskId());
                        return bytes;
                    }
                    if (mapOutput.inMemory) {
                        ReduceCopier.this.mapOutputsFilesInMemory.add(mapOutput);
                    } else {
                        tmpMapOutput = mapOutput.file;
                        filename = new Path(tmpMapOutput.getParent(), filename.getName());
                        if (!ReduceCopier.this.localFileSys.rename(tmpMapOutput, filename)) {
                            ReduceCopier.this.localFileSys.delete(tmpMapOutput, true);
                            bytes = -1L;
                            throw new IOException("Failed to rename map output " + tmpMapOutput + " to " + filename);
                        }
                        SortedSet sortedSet = ReduceTask.this.mapOutputFilesOnDisk;
                        synchronized (sortedSet) {
                            ReduceCopier.this.addToMapOutputFilesOnDisk(ReduceCopier.this.localFileSys.getFileStatus(filename));
                        }
                    }
                    this.noteCopiedMapOutput(loc.getTaskId());
                }
                return bytes;
            }

            private void noteCopiedMapOutput(TaskID taskId) {
                ReduceCopier.this.copiedMapOutputs.add(taskId);
                ReduceCopier.this.ramManager.setNumCopiedMapOutputs(ReduceTask.this.numMaps - ReduceCopier.this.copiedMapOutputs.size());
            }

            private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, Path filename, int reduce) throws IOException, InterruptedException {
                URL url = mapOutputLoc.getOutputLocation();
                URLConnection connection = url.openConnection();
                InputStream input = this.setupSecureConnection(mapOutputLoc, connection);
                TaskAttemptID mapId = null;
                try {
                    mapId = TaskAttemptID.forName(connection.getHeaderField("from-map-task"));
                }
                catch (IllegalArgumentException ia) {
                    LOG.warn((Object)"Invalid map id ", (Throwable)ia);
                    return null;
                }
                TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
                if (!mapId.equals(expectedMapId)) {
                    LOG.warn((Object)("data from wrong map:" + mapId + " arrived to reduce task " + reduce + ", where as expected map output should be from " + expectedMapId));
                    return null;
                }
                long decompressedLength = Long.parseLong(connection.getHeaderField("Raw-Map-Output-Length"));
                long compressedLength = Long.parseLong(connection.getHeaderField("Map-Output-Length"));
                if (compressedLength < 0L || decompressedLength < 0L) {
                    LOG.warn((Object)(this.getName() + " invalid lengths in map output header: id: " + mapId + " compressed len: " + compressedLength + ", decompressed len: " + decompressedLength));
                    return null;
                }
                int forReduce = Integer.parseInt(connection.getHeaderField("for-reduce-task"));
                if (forReduce != reduce) {
                    LOG.warn((Object)("data for the wrong reduce: " + forReduce + " with compressed len: " + compressedLength + ", decompressed len: " + decompressedLength + " arrived to reduce task " + reduce));
                    return null;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("header: " + mapId + ", compressed len: " + compressedLength + ", decompressed len: " + decompressedLength));
                }
                boolean shuffleInMemory = ReduceCopier.this.ramManager.canFitInMemory(decompressedLength);
                MapOutput mapOutput = null;
                if (shuffleInMemory) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into RAM from " + mapOutputLoc.getTaskAttemptId()));
                    }
                    mapOutput = this.shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength, (int)compressedLength);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into Local-FS from " + mapOutputLoc.getTaskAttemptId()));
                    }
                    mapOutput = this.shuffleToDisk(mapOutputLoc, input, filename, compressedLength);
                }
                return mapOutput;
            }

            private InputStream setupSecureConnection(MapOutputLocation mapOutputLoc, URLConnection connection) throws IOException {
                String msgToEncode = SecureShuffleUtils.buildMsgFrom(connection.getURL());
                String encHash = SecureShuffleUtils.hashFromString(msgToEncode, this.jobTokenSecret);
                connection.setRequestProperty("UrlHash", encHash);
                InputStream input = this.getInputStream(connection, this.shuffleConnectionTimeout, this.shuffleReadTimeout);
                String replyHash = connection.getHeaderField("ReplyHash");
                if (replyHash == null) {
                    throw new IOException("security validation of TT Map output failed");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash=" + replyHash));
                }
                SecureShuffleUtils.verifyReply(replyHash, encHash, this.jobTokenSecret);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("for url=" + msgToEncode + " sent hash and receievd reply"));
                }
                return input;
            }

            private InputStream getInputStream(URLConnection connection, int connectionTimeout, int readTimeout) throws IOException {
                int unit = 0;
                if (connectionTimeout < 0) {
                    throw new IOException("Invalid timeout [timeout = " + connectionTimeout + " ms]");
                }
                if (connectionTimeout > 0) {
                    unit = 30000 > connectionTimeout ? connectionTimeout : 30000;
                }
                connection.setReadTimeout(readTimeout);
                connection.setConnectTimeout(unit);
                while (true) {
                    try {
                        connection.connect();
                    }
                    catch (IOException ioe) {
                        if ((connectionTimeout -= unit) == 0) {
                            throw ioe;
                        }
                        if (connectionTimeout >= unit) continue;
                        unit = connectionTimeout;
                        connection.setConnectTimeout(unit);
                        continue;
                    }
                    break;
                }
                try {
                    return connection.getInputStream();
                }
                catch (IOException ioe) {
                    this.readError = true;
                    throw ioe;
                }
            }

            private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc, URLConnection connection, InputStream input, int mapOutputLength, int compressedLength) throws IOException, InterruptedException {
                boolean createdNow = ReduceCopier.this.ramManager.reserve(mapOutputLength, input);
                if (!createdNow) {
                    try {
                        connection = mapOutputLoc.getOutputLocation().openConnection();
                        input = this.setupSecureConnection(mapOutputLoc, connection);
                    }
                    catch (IOException ioe) {
                        LOG.info((Object)("Failed reopen connection to fetch map-output from " + mapOutputLoc.getHost()));
                        ReduceCopier.this.ramManager.closeInMemoryFile(mapOutputLength);
                        ReduceCopier.this.ramManager.unreserve(mapOutputLength);
                        throw ioe;
                    }
                }
                IFileInputStream checksumIn = new IFileInputStream(input, compressedLength);
                input = checksumIn;
                if (this.codec != null) {
                    this.decompressor.reset();
                    input = this.codec.createInputStream(input, this.decompressor);
                }
                byte[] shuffleData = new byte[mapOutputLength];
                MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);
                int bytesRead = 0;
                try {
                    int n = input.read(shuffleData, 0, shuffleData.length);
                    while (n > 0) {
                        ReduceCopier.this.shuffleClientMetrics.inputBytes(n);
                        this.reporter.progress();
                        n = input.read(shuffleData, bytesRead += n, shuffleData.length - bytesRead);
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Read " + bytesRead + " bytes from map-output for " + mapOutputLoc.getTaskAttemptId()));
                    }
                    input.close();
                }
                catch (IOException ioe) {
                    LOG.info((Object)("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId()), (Throwable)ioe);
                    ReduceCopier.this.ramManager.closeInMemoryFile(mapOutputLength);
                    ReduceCopier.this.ramManager.unreserve(mapOutputLength);
                    try {
                        mapOutput.discard();
                    }
                    catch (IOException ignored) {
                        LOG.info((Object)("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId()), (Throwable)ignored);
                    }
                    mapOutput = null;
                    IOUtils.cleanup(LOG, input);
                    this.readError = true;
                    throw ioe;
                }
                ReduceCopier.this.ramManager.closeInMemoryFile(mapOutputLength);
                if (bytesRead != mapOutputLength) {
                    ReduceCopier.this.ramManager.unreserve(mapOutputLength);
                    try {
                        mapOutput.discard();
                    }
                    catch (IOException ignored) {
                        LOG.info((Object)("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId()), (Throwable)ignored);
                    }
                    mapOutput = null;
                    throw new IOException("Incomplete map output received for " + mapOutputLoc.getTaskAttemptId() + " from " + mapOutputLoc.getOutputLocation() + " (" + bytesRead + " instead of " + mapOutputLength + ")");
                }
                if (LOG.isDebugEnabled() && mapOutputLength > 0) {
                    DataInputBuffer dib = new DataInputBuffer();
                    dib.reset(shuffleData, 0, shuffleData.length);
                    LOG.debug((Object)("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> (" + WritableUtils.readVInt(dib) + ", " + WritableUtils.readVInt(dib) + ") from " + mapOutputLoc.getHost()));
                }
                return mapOutput;
            }

            private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc, InputStream input, Path filename, long mapOutputLength) throws IOException {
                Path localFilename = ReduceTask.this.lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), mapOutputLength, ReduceTask.this.conf);
                MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), ReduceTask.this.conf, ReduceCopier.this.localFileSys.makeQualified(localFilename), mapOutputLength);
                FSDataOutputStream output = null;
                long bytesRead = 0L;
                try {
                    output = ReduceCopier.this.rfs.create(localFilename);
                    byte[] buf = new byte[65536];
                    int n = -1;
                    try {
                        n = input.read(buf, 0, buf.length);
                    }
                    catch (IOException ioe) {
                        this.readError = true;
                        throw ioe;
                    }
                    while (n > 0) {
                        bytesRead += (long)n;
                        ReduceCopier.this.shuffleClientMetrics.inputBytes(n);
                        ((OutputStream)output).write(buf, 0, n);
                        this.reporter.progress();
                        try {
                            n = input.read(buf, 0, buf.length);
                        }
                        catch (IOException ioe) {
                            this.readError = true;
                            throw ioe;
                        }
                    }
                    LOG.info((Object)("Read " + bytesRead + " bytes from map-output for " + mapOutputLoc.getTaskAttemptId()));
                    ((OutputStream)output).close();
                    input.close();
                }
                catch (IOException ioe) {
                    LOG.info((Object)("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId()), (Throwable)ioe);
                    try {
                        mapOutput.discard();
                    }
                    catch (IOException ignored) {
                        LOG.info((Object)("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId()), (Throwable)ignored);
                    }
                    mapOutput = null;
                    IOUtils.cleanup(LOG, input, output);
                    throw ioe;
                }
                if (bytesRead != mapOutputLength) {
                    try {
                        mapOutput.discard();
                    }
                    catch (Exception ioe) {
                        LOG.info((Object)("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId()), (Throwable)ioe);
                    }
                    catch (Throwable t) {
                        String msg = ReduceTask.this.getTaskID() + " : Failed in shuffle to disk :" + StringUtils.stringifyException(t);
                        ReduceTask.this.reportFatalError(ReduceTask.this.getTaskID(), t, msg);
                    }
                    mapOutput = null;
                    throw new IOException("Incomplete map output received for " + mapOutputLoc.getTaskAttemptId() + " from " + mapOutputLoc.getOutputLocation() + " (" + bytesRead + " instead of " + mapOutputLength + ")");
                }
                return mapOutput;
            }
        }

        class ShuffleRamManager
        implements RamManager {
            private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
            private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f;
            private final long maxSize;
            private final long maxSingleShuffleLimit;
            private long size = 0L;
            private Object dataAvailable = new Object();
            private long fullSize = 0L;
            private int numPendingRequests = 0;
            private int numRequiredMapOutputs = 0;
            private int numClosed = 0;
            private boolean closed = false;

            public ShuffleRamManager(Configuration conf) throws IOException {
                float maxInMemCopyUse = conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.7f);
                if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
                    throw new IOException("mapred.job.shuffle.input.buffer.percent" + maxInMemCopyUse);
                }
                this.maxSize = (int)((float)conf.getInt("mapred.job.reduce.total.mem.bytes", (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
                this.maxSingleShuffleLimit = (long)((float)this.maxSize * 0.25f);
                LOG.info((Object)("ShuffleRamManager: MemoryLimit=" + this.maxSize + ", MaxSingleShuffleLimit=" + this.maxSingleShuffleLimit));
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public synchronized boolean reserve(int requestedSize, InputStream in) throws InterruptedException {
                while (this.size + (long)requestedSize > this.maxSize) {
                    if (in != null) {
                        try {
                            in.close();
                        }
                        catch (IOException ie) {
                            LOG.info((Object)("Failed to close connection with: " + ie));
                        }
                        finally {
                            in = null;
                        }
                    }
                    Object object = this.dataAvailable;
                    synchronized (object) {
                        ++this.numPendingRequests;
                        this.dataAvailable.notify();
                    }
                    this.wait();
                    object = this.dataAvailable;
                    synchronized (object) {
                        --this.numPendingRequests;
                    }
                }
                this.size += (long)requestedSize;
                return in != null;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public synchronized void unreserve(int requestedSize) {
                this.size -= (long)requestedSize;
                Object object = this.dataAvailable;
                synchronized (object) {
                    this.fullSize -= (long)requestedSize;
                    --this.numClosed;
                }
                this.notifyAll();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public boolean waitForDataToMerge() throws InterruptedException {
                boolean done = false;
                Object object = this.dataAvailable;
                synchronized (object) {
                    while (!(this.closed || !(this.getPercentUsed() < ReduceCopier.this.maxInMemCopyPer) && this.numClosed >= 2 || ReduceCopier.this.maxInMemOutputs > 0 && this.numClosed >= ReduceCopier.this.maxInMemOutputs || !((float)this.numPendingRequests < (float)ReduceCopier.this.numCopiers * 0.75f) || 0 != this.numRequiredMapOutputs && this.numPendingRequests >= this.numRequiredMapOutputs)) {
                        this.dataAvailable.wait();
                    }
                    done = this.closed;
                }
                return done;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void closeInMemoryFile(int requestedSize) {
                Object object = this.dataAvailable;
                synchronized (object) {
                    this.fullSize += (long)requestedSize;
                    ++this.numClosed;
                    this.dataAvailable.notify();
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void setNumCopiedMapOutputs(int numRequiredMapOutputs) {
                Object object = this.dataAvailable;
                synchronized (object) {
                    this.numRequiredMapOutputs = numRequiredMapOutputs;
                    this.dataAvailable.notify();
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void close() {
                Object object = this.dataAvailable;
                synchronized (object) {
                    this.closed = true;
                    LOG.info((Object)"Closed ram manager");
                    this.dataAvailable.notify();
                }
            }

            private float getPercentUsed() {
                return (float)this.fullSize / (float)this.maxSize;
            }

            boolean canFitInMemory(long requestedSize) {
                return requestedSize < Integer.MAX_VALUE && requestedSize < this.maxSingleShuffleLimit;
            }
        }

        private class MapOutput {
            final TaskID mapId;
            final TaskAttemptID mapAttemptId;
            final Path file;
            final Configuration conf;
            byte[] data;
            final boolean inMemory;
            long compressedSize;

            public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, Configuration conf, Path file, long size) {
                this.mapId = mapId;
                this.mapAttemptId = mapAttemptId;
                this.conf = conf;
                this.file = file;
                this.compressedSize = size;
                this.data = null;
                this.inMemory = false;
            }

            public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data, int compressedLength) {
                this.mapId = mapId;
                this.mapAttemptId = mapAttemptId;
                this.file = null;
                this.conf = null;
                this.data = data;
                this.compressedSize = compressedLength;
                this.inMemory = true;
            }

            public void discard() throws IOException {
                if (this.inMemory) {
                    this.data = null;
                } else {
                    FileSystem fs = this.file.getFileSystem(this.conf);
                    fs.delete(this.file, true);
                }
            }
        }

        private class MapOutputLocation {
            TaskAttemptID taskAttemptId;
            TaskID taskId;
            String ttHost;
            URL taskOutput;

            public MapOutputLocation(TaskAttemptID taskAttemptId, String ttHost, URL taskOutput) {
                this.taskAttemptId = taskAttemptId;
                this.taskId = this.taskAttemptId.getTaskID();
                this.ttHost = ttHost;
                this.taskOutput = taskOutput;
            }

            public TaskAttemptID getTaskAttemptId() {
                return this.taskAttemptId;
            }

            public TaskID getTaskId() {
                return this.taskId;
            }

            public String getHost() {
                return this.ttHost;
            }

            public URL getOutputLocation() {
                return this.taskOutput;
            }
        }

        private class CopyResult {
            private final MapOutputLocation loc;
            private final long size;
            private static final int OBSOLETE = -2;
            private CopyOutputErrorType error = CopyOutputErrorType.NO_ERROR;

            CopyResult(MapOutputLocation loc, long size) {
                this.loc = loc;
                this.size = size;
            }

            CopyResult(MapOutputLocation loc, long size, CopyOutputErrorType error) {
                this.loc = loc;
                this.size = size;
                this.error = error;
            }

            public boolean getSuccess() {
                return this.size >= 0L;
            }

            public boolean isObsolete() {
                return this.size == -2L;
            }

            public long getSize() {
                return this.size;
            }

            public String getHost() {
                return this.loc.getHost();
            }

            public MapOutputLocation getLocation() {
                return this.loc;
            }

            public CopyOutputErrorType getError() {
                return this.error;
            }
        }

        class ShuffleClientMetrics
        implements Updater {
            private MetricsRecord shuffleMetrics = null;
            private int numFailedFetches = 0;
            private int numSuccessFetches = 0;
            private long numBytes = 0L;
            private int numThreadsBusy = 0;

            ShuffleClientMetrics(JobConf conf) {
                MetricsContext metricsContext = MetricsUtil.getContext("mapred");
                this.shuffleMetrics = MetricsUtil.createRecord(metricsContext, "shuffleInput");
                this.shuffleMetrics.setTag("user", conf.getUser());
                this.shuffleMetrics.setTag("jobName", conf.getJobName());
                this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobID().toString());
                this.shuffleMetrics.setTag("taskId", ReduceTask.this.getTaskID().toString());
                this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
                metricsContext.registerUpdater(this);
            }

            public synchronized void inputBytes(long numBytes) {
                this.numBytes += numBytes;
            }

            public synchronized void failedFetch() {
                ++this.numFailedFetches;
            }

            public synchronized void successFetch() {
                ++this.numSuccessFetches;
            }

            public synchronized void threadBusy() {
                ++this.numThreadsBusy;
            }

            public synchronized void threadFree() {
                --this.numThreadsBusy;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void doUpdates(MetricsContext unused) {
                ShuffleClientMetrics shuffleClientMetrics = this;
                synchronized (shuffleClientMetrics) {
                    this.shuffleMetrics.incrMetric("shuffle_input_bytes", this.numBytes);
                    this.shuffleMetrics.incrMetric("shuffle_failed_fetches", this.numFailedFetches);
                    this.shuffleMetrics.incrMetric("shuffle_success_fetches", this.numSuccessFetches);
                    if (ReduceCopier.this.numCopiers != 0) {
                        this.shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 100.0f * ((float)this.numThreadsBusy / (float)ReduceCopier.this.numCopiers));
                    } else {
                        this.shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
                    }
                    this.numBytes = 0L;
                    this.numSuccessFetches = 0;
                    this.numFailedFetches = 0;
                }
                this.shuffleMetrics.update();
            }
        }
    }

    private static enum CopyOutputErrorType {
        NO_ERROR,
        READ_ERROR,
        OTHER_ERROR;

    }

    static class NewTrackingRecordWriter<K, V>
    extends RecordWriter<K, V> {
        private final RecordWriter<K, V> real;
        private final Counter outputRecordCounter;

        NewTrackingRecordWriter(RecordWriter<K, V> real, Counter recordCounter) {
            this.real = real;
            this.outputRecordCounter = recordCounter;
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            this.real.close(context);
        }

        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            this.real.write(key, value);
            this.outputRecordCounter.increment(1L);
        }
    }

    private class SkippingReduceValuesIterator<KEY, VALUE>
    extends ReduceValuesIterator<KEY, VALUE> {
        private SortedRanges.SkipRangeIterator skipIt;
        private TaskUmbilicalProtocol umbilical;
        private Counters.Counter skipGroupCounter;
        private Counters.Counter skipRecCounter;
        private long grpIndex;
        private Class<KEY> keyClass;
        private Class<VALUE> valClass;
        private SequenceFile.Writer skipWriter;
        private boolean toWriteSkipRecs;
        private boolean hasNext;
        private Task.TaskReporter reporter;

        public SkippingReduceValuesIterator(RawKeyValueIterator in, RawComparator<KEY> comparator, Class<KEY> keyClass, Class<VALUE> valClass, Configuration conf, Task.TaskReporter reporter, TaskUmbilicalProtocol umbilical) throws IOException {
            super(in, comparator, keyClass, valClass, conf, reporter);
            this.grpIndex = -1L;
            this.umbilical = umbilical;
            this.skipGroupCounter = reporter.getCounter((Enum)Task.Counter.REDUCE_SKIPPED_GROUPS);
            this.skipRecCounter = reporter.getCounter((Enum)Task.Counter.REDUCE_SKIPPED_RECORDS);
            this.toWriteSkipRecs = ReduceTask.this.toWriteSkipRecs() && SkipBadRecords.getSkipOutputPath(conf) != null;
            this.keyClass = keyClass;
            this.valClass = valClass;
            this.reporter = reporter;
            this.skipIt = ReduceTask.this.getSkipRanges().skipRangeIterator();
            this.mayBeSkip();
        }

        @Override
        void nextKey() throws IOException {
            super.nextKey();
            this.mayBeSkip();
        }

        @Override
        boolean more() {
            return super.more() && this.hasNext;
        }

        private void mayBeSkip() throws IOException {
            this.hasNext = this.skipIt.hasNext();
            if (!this.hasNext) {
                LOG.warn((Object)"Further groups got skipped.");
                return;
            }
            ++this.grpIndex;
            long nextGrpIndex = this.skipIt.next();
            long skip = 0L;
            long skipRec = 0L;
            while (this.grpIndex < nextGrpIndex && super.more()) {
                while (this.hasNext()) {
                    Object value = this.moveToNext();
                    if (this.toWriteSkipRecs) {
                        this.writeSkippedRec(this.getKey(), value);
                    }
                    ++skipRec;
                }
                super.nextKey();
                ++this.grpIndex;
                ++skip;
            }
            if (skip > 0L && this.skipIt.skippedAllRanges() && this.skipWriter != null) {
                this.skipWriter.close();
            }
            this.skipGroupCounter.increment(skip);
            this.skipRecCounter.increment(skipRec);
            ReduceTask.this.reportNextRecordRange(this.umbilical, this.grpIndex);
        }

        private void writeSkippedRec(KEY key, VALUE value) throws IOException {
            if (this.skipWriter == null) {
                Path skipDir = SkipBadRecords.getSkipOutputPath(ReduceTask.this.conf);
                Path skipFile = new Path(skipDir, ReduceTask.this.getTaskID().toString());
                this.skipWriter = SequenceFile.createWriter(skipFile.getFileSystem(ReduceTask.this.conf), (Configuration)ReduceTask.this.conf, skipFile, this.keyClass, this.valClass, SequenceFile.CompressionType.BLOCK, this.reporter);
            }
            this.skipWriter.append(key, value);
        }
    }

    private class ReduceValuesIterator<KEY, VALUE>
    extends Task.ValuesIterator<KEY, VALUE> {
        public ReduceValuesIterator(RawKeyValueIterator in, RawComparator<KEY> comparator, Class<KEY> keyClass, Class<VALUE> valClass, Configuration conf, Progressable reporter) throws IOException {
            super(in, comparator, keyClass, valClass, conf, reporter);
        }

        @Override
        public VALUE next() {
            ReduceTask.this.reduceInputValueCounter.increment(1L);
            return this.moveToNext();
        }

        protected VALUE moveToNext() {
            return super.next();
        }

        public void informReduceProgress() {
            ReduceTask.this.reducePhase.set(this.in.getProgress().get());
            this.reporter.progress();
        }
    }
}

