/*
 * Decompiled with CFR 0.152.
 */
package org.commoncrawl.query;

import java.io.DataInput;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataInputBuffer;
import org.commoncrawl.async.Callbacks;
import org.commoncrawl.async.ConcurrentTask;
import org.commoncrawl.async.EventLoop;
import org.commoncrawl.async.Timer;
import org.commoncrawl.query.BaseConfig;
import org.commoncrawl.query.Query;
import org.commoncrawl.query.QueryCommon;
import org.commoncrawl.query.QueryProgressCallback;
import org.commoncrawl.query.QueryServerSlave;
import org.commoncrawl.query.QueryStatus;
import org.commoncrawl.query.RemoteQueryCompletionCallback;
import org.commoncrawl.query.RemoteQueryInfo;
import org.commoncrawl.query.SlaveStatus;
import org.commoncrawl.rpc.BinaryProtocol;
import org.commoncrawl.rpc.EmptyStruct;
import org.commoncrawl.rpc.IncomingMessageContext;
import org.commoncrawl.rpc.MessageData;
import org.commoncrawl.rpc.RPCException;
import org.commoncrawl.rpc.RPCStruct;
import org.commoncrawl.util.shared.CCStringUtils;
import org.commoncrawl.util.shared.FileUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class QuerySlaveServer
implements QueryServerSlave,
QueryProgressCallback,
RemoteQueryCompletionCallback {
    static final Log LOG = LogFactory.getLog(QuerySlaveServer.class);
    private LinkedList<Query> _pendingQueries = new LinkedList();
    private Map<Long, Query> _activeQueries = new HashMap<Long, Query>();
    private HashSet<Long> _cancelledQueries = new HashSet();
    private boolean _cancelling = false;
    private BaseConfig _baseConfig;
    private SlaveStatus _slaveStatus = new SlaveStatus();
    private String _fqName;
    private Timer _pollTimer;
    public static final int SLAVE_MAX_CONCURRENT_QUERIES_DEFAULT = 1;
    public static final String SLAVE_MAX_CONCURRENT_QUERIES_PARAM = "query.slave.max.concurrent.queries";
    private int _maxConcurrentQueries = 1;
    private static final int POLL_TIMER_DELAY = 100;

    public abstract Query createQueryObjectGivenType(String var1, int var2, RPCStruct var3) throws IOException;

    public abstract RPCStruct createQueryDataObjectGivenType(String var1);

    public abstract File getLocalQueryTempDir();

    public abstract EventLoop getEventLoop();

    public abstract FileSystem getRemoteFileSystem();

    public abstract Configuration getConfiguration();

    public abstract ExecutorService getQueryThreadPool();

    public QuerySlaveServer(String fqName) {
        this._fqName = fqName;
    }

    public String getFQName() {
        return this._fqName;
    }

    @Override
    public void cancelQuery(IncomingMessageContext<QueryCommon, EmptyStruct> message) throws RPCException {
        if (this._activeQueries.containsKey(((QueryCommon)message.getInput()).getQueryId())) {
            this._cancelledQueries.add(((QueryCommon)message.getInput()).getQueryId());
        }
    }

    @Override
    public void doQuery(IncomingMessageContext<RemoteQueryInfo, QueryStatus> rpcContext) throws RPCException {
        LOG.info((Object)(this._fqName + " Adding Query Type:" + ((RemoteQueryInfo)rpcContext.getInput()).getQueryClassType() + "Id:" + ((RemoteQueryInfo)rpcContext.getInput()).getCommonInfo().getQueryId() + " to Queue."));
        try {
            String queryObjectType = ((RemoteQueryInfo)rpcContext.getInput()).getQueryClassType();
            LOG.info((Object)(this._fqName + " QueryId:" + ((RemoteQueryInfo)rpcContext.getInput()).getCommonInfo().getQueryId() + " ObjectType:" + queryObjectType));
            String queryDataType = ((RemoteQueryInfo)rpcContext.getInput()).getQueryDataClassType();
            LOG.info((Object)(this._fqName + " QueryId:" + ((RemoteQueryInfo)rpcContext.getInput()).getCommonInfo().getQueryId() + " QueryDataType:" + queryDataType));
            RPCStruct queryData = this.createQueryDataObjectGivenType(queryDataType);
            LOG.info((Object)(this._fqName + " QueryId:" + ((RemoteQueryInfo)rpcContext.getInput()).getCommonInfo().getQueryId() + " DeSerializing Query Data"));
            DataInputBuffer inputStream = new DataInputBuffer();
            inputStream.reset(((RemoteQueryInfo)rpcContext.getInput()).getQueryDataBuffer().getReadOnlyBytes(), 0, ((RemoteQueryInfo)rpcContext.getInput()).getQueryDataBuffer().getCount());
            queryData.deserialize((DataInput)inputStream, new BinaryProtocol());
            LOG.info((Object)(this._fqName + " QueryId:" + ((RemoteQueryInfo)rpcContext.getInput()).getCommonInfo().getQueryId() + " Allocating QueryOp Object of type:" + queryObjectType));
            Query queryObject = this.createQueryObjectGivenType(queryObjectType, ((RemoteQueryInfo)rpcContext.getInput()).getShardId(), queryData);
            LOG.info((Object)(this._fqName + " QueryId:" + ((RemoteQueryInfo)rpcContext.getInput()).getCommonInfo().getQueryId() + " Initializing QueryObject"));
            queryObject.initializeRemoteQuery(this, (RemoteQueryInfo)rpcContext.getInput(), queryData);
            LOG.info((Object)(this._fqName + " QueryId:" + ((RemoteQueryInfo)rpcContext.getInput()).getCommonInfo().getQueryId() + " Adding to Pending Queue"));
            if (queryObject.isHighPriorityQuery()) {
                this.activateQuery(queryObject);
            } else {
                this._pendingQueries.add(queryObject);
            }
            this.updateSlaveStatusForQueryObject(queryObject);
            this.potentiallyStartNextQuery();
            ((QueryStatus)rpcContext.getOutput()).merge(queryObject.getQueryStatus());
        }
        catch (Exception e) {
            LOG.error((Object)CCStringUtils.stringifyException((Throwable)e));
            LOG.error((Object)("Query Dispatch for Query Id:" + ((RemoteQueryInfo)rpcContext.getInput()).getCommonInfo().getQueryId() + " Failed with Exception:" + CCStringUtils.stringifyException((Throwable)e)));
            rpcContext.setStatus(MessageData.Status.Error_RequestFailed);
            rpcContext.setErrorDesc(CCStringUtils.stringifyException((Throwable)e));
        }
        rpcContext.completeRequest();
    }

    @Override
    public void heartbeatQuerySlave(IncomingMessageContext<EmptyStruct, SlaveStatus> message) throws RPCException {
        this.sendStatusResponse(message);
    }

    @Override
    public void initializeQuerySlave(final IncomingMessageContext<BaseConfig, SlaveStatus> messageContext) throws RPCException {
        this._maxConcurrentQueries = this.getConfiguration().getInt(SLAVE_MAX_CONCURRENT_QUERIES_PARAM, 1);
        this.terminateAndFlushAllQueries(new Callbacks.Callback(){

            public void execute() {
                QuerySlaveServer.this._activeQueries.clear();
                QuerySlaveServer.this._pendingQueries.clear();
                QuerySlaveServer.this._slaveStatus.clear();
                QuerySlaveServer.this._slaveStatus.setState(1);
                QuerySlaveServer.this._cancelling = false;
                try {
                    QuerySlaveServer.this._baseConfig = (BaseConfig)((BaseConfig)messageContext.getInput()).clone();
                }
                catch (CloneNotSupportedException cloneNotSupportedException) {
                    // empty catch block
                }
                LOG.info((Object)(QuerySlaveServer.this._fqName + " All Data Files successfully loaded. finishing initialization"));
                QuerySlaveServer.this.finishInitialize(messageContext);
            }
        });
        this._pollTimer = new Timer(100L, true, new Timer.Callback(){

            public void timerFired(Timer timer) {
                QuerySlaveServer.this.potentiallyStartNextQuery();
            }
        });
        this.getEventLoop().setTimer(this._pollTimer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean updateProgress(Query theQueryObject, float percentComplete) {
        LOG.info((Object)(this._fqName + " Update Progress Received for Query:" + theQueryObject.getQueryId() + "pctComplete:" + percentComplete));
        if (!this._cancelling) {
            HashSet<Long> hashSet = this._cancelledQueries;
            synchronized (hashSet) {
                if (this._cancelledQueries.contains(theQueryObject.getQueryId())) {
                    return false;
                }
            }
            this.updateSlaveStatusForQueryObject(theQueryObject);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void queryComplete(Query request, long resultCount) {
        LOG.info((Object)(this._fqName + " QueyComplete received for Query:" + request.getQueryId() + " resultCount:" + resultCount));
        if (!this._cancelling) {
            HashSet<Long> hashSet = this._cancelledQueries;
            synchronized (hashSet) {
                if (this._cancelledQueries.contains(request.getQueryId())) {
                    this._cancelledQueries.remove(request.getQueryId());
                    LOG.info((Object)(this._fqName + " Query Seems to have been cancelled. Explicitly cancelling Query:" + request.getQueryId()));
                    request.getQueryStatus().setStatus(4);
                }
            }
            this.updateSlaveStatusForQueryObject(request);
            this._activeQueries.remove(request.getQueryId());
            FileUtils.recursivelyDeleteFile(this.getTempDirForShardedQuery(request));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void queryFailed(Query request, String reason) {
        LOG.info((Object)(this._fqName + " QueryFailed received for Query:" + request.getQueryId() + " reason:" + reason));
        if (!this._cancelling) {
            HashSet<Long> hashSet = this._cancelledQueries;
            synchronized (hashSet) {
                if (this._cancelledQueries.contains(request.getQueryId())) {
                    this._cancelledQueries.remove(request.getQueryId());
                    request.getQueryStatus().setStatus(4);
                }
            }
            this.updateSlaveStatusForQueryObject(request);
            this._activeQueries.remove(request.getQueryId());
            FileUtils.recursivelyDeleteFile(this.getTempDirForShardedQuery(request));
        }
    }

    private File getTempDirForShardedQuery(Query query) {
        return new File(this.getLocalQueryTempDir(), Long.toString(query.getQueryId()) + "-" + query.getRemoteQueryInfo().getShardId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void activateQuery(Query queryObject) {
        LOG.info((Object)(this._fqName + " Activating Query:" + queryObject.getQueryId()));
        this._activeQueries.put(queryObject.getQueryId(), queryObject);
        try {
            LOG.info((Object)(this._fqName + " Starting Slave Query for Query:" + queryObject.getQueryId()));
            queryObject.runShardedQuery(this._fqName, this.getRemoteFileSystem(), this.getConfiguration(), this.getEventLoop(), this, this.getTempDirForShardedQuery(queryObject), this, this);
            this.updateSlaveStatusForQueryObject(queryObject);
        }
        catch (IOException e) {
            LOG.info((Object)(this._fqName + " Query Activation for Query:" + queryObject.getQueryId() + " Failed with Exception:" + CCStringUtils.stringifyException((Throwable)e)));
            this._activeQueries.remove(queryObject.getQueryId());
            Query query = queryObject;
            synchronized (query) {
                queryObject.getQueryStatus().setStatus(3);
                queryObject.getQueryStatus().setOptErrorReason(CCStringUtils.stringifyException((Throwable)e));
            }
            FileUtils.recursivelyDeleteFile(this.getTempDirForShardedQuery(queryObject));
            this.updateSlaveStatusForQueryObject(queryObject);
        }
    }

    private void finishInitialize(IncomingMessageContext<BaseConfig, SlaveStatus> rpcContext) {
        this._slaveStatus.setState(2);
        this.sendStatusResponse(rpcContext);
    }

    private void sendStatusResponse(IncomingMessageContext<? extends RPCStruct, SlaveStatus> context) {
        try {
            context.setOutput((SlaveStatus)this._slaveStatus.clone());
            if (((SlaveStatus)context.getOutput()).getQueryStatus().size() != 0) {
                // empty if block
            }
            this._slaveStatus.getQueryStatus().clear();
        }
        catch (CloneNotSupportedException e) {
            // empty catch block
        }
        try {
            context.completeRequest();
        }
        catch (RPCException e) {
            LOG.error((Object)"failed to send StatusResponse to incoming RPC!");
        }
    }

    private void terminateAndFlushAllQueries(final Callbacks.Callback callback) {
        this._cancelling = true;
        if (this._activeQueries.size() == 0) {
            callback.execute();
        } else {
            final Vector<Query> activeQueries = new Vector<Query>(this._activeQueries.values());
            this.getQueryThreadPool().submit(new ConcurrentTask<Boolean>(this.getEventLoop(), new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    LOG.info((Object)(QuerySlaveServer.this._fqName + " Starting Cancel Thread"));
                    for (Query query : activeQueries) {
                        LOG.info((Object)(QuerySlaveServer.this._fqName + " Cancelling Query:" + query.getQueryId()));
                        try {
                            query.cancelSlaveQuery();
                        }
                        catch (Exception e) {
                            LOG.error((Object)("Error Cancelling Query:" + query.getQueryId() + " Error:" + CCStringUtils.stringifyException((Throwable)e)));
                        }
                        LOG.info((Object)(QuerySlaveServer.this._fqName + " Cancelled Query:" + query.getQueryId()));
                    }
                    return true;
                }
            }, new ConcurrentTask.CompletionCallback<Boolean>(){

                @Override
                public void taskComplete(Boolean loadResult) {
                    QuerySlaveServer.this._cancelling = false;
                    callback.execute();
                }

                @Override
                public void taskFailed(Exception e) {
                    QuerySlaveServer.this._cancelling = false;
                    LOG.error((Object)CCStringUtils.stringifyException((Throwable)e));
                    callback.execute();
                }
            }));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateSlaveStatusForQueryObject(Query theQueryObject) {
        boolean addNewStatus;
        long queryId = -1L;
        int shardId = -1;
        Query query = theQueryObject;
        synchronized (query) {
            queryId = theQueryObject.getQueryId();
            shardId = theQueryObject.getRemoteQueryInfo().getShardId();
        }
        QueryStatus targetStatus = null;
        SlaveStatus slaveStatus = this._slaveStatus;
        synchronized (slaveStatus) {
            for (QueryStatus status : this._slaveStatus.getQueryStatus()) {
                if (status.getQueryId() != queryId || status.getShardId() != shardId) continue;
                targetStatus = status;
                break;
            }
        }
        boolean bl = addNewStatus = targetStatus == null;
        if (targetStatus == null) {
            targetStatus = new QueryStatus();
        }
        Object object = theQueryObject;
        synchronized (object) {
            theQueryObject.getQueryStatus().setFieldClean(3);
            theQueryObject.getQueryStatus().setFieldClean(4);
            try {
                targetStatus.merge(theQueryObject.getQueryStatus());
            }
            catch (CloneNotSupportedException e) {
                // empty catch block
            }
        }
        if (addNewStatus) {
            object = this._slaveStatus;
            synchronized (object) {
                this._slaveStatus.getQueryStatus().add(targetStatus);
            }
        }
    }

    private void potentiallyStartNextQuery() {
        while (this._activeQueries.size() < this._maxConcurrentQueries && this._pendingQueries.size() != 0) {
            Query queryObject = this._pendingQueries.removeFirst();
            this.activateQuery(queryObject);
        }
    }
}

