/*
 * Decompiled with CFR 0.152.
 */
package org.commoncrawl.rpc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.commoncrawl.async.Callbacks;
import org.commoncrawl.async.EventLoop;
import org.commoncrawl.async.Timer;
import org.commoncrawl.rpc.Channel;
import org.commoncrawl.rpc.InProcessActor;
import org.commoncrawl.rpc.IncomingMessageContext;
import org.commoncrawl.rpc.MessageData;
import org.commoncrawl.rpc.OutgoingMessageContext;
import org.commoncrawl.rpc.RPCActorService;
import org.commoncrawl.rpc.RPCChannel;
import org.commoncrawl.rpc.RPCException;
import org.commoncrawl.rpc.RPCServerChannel;
import org.commoncrawl.rpc.RPCTestService;
import org.commoncrawl.rpc.UnitTestStruct1;
import org.junit.Test;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class RPCTestServer
extends RPCActorService
implements RPCTestService {
    public RPCTestServer() {
        super(null);
    }

    @Override
    public void hello(IncomingMessageContext<UnitTestStruct1, UnitTestStruct1> rpcContext) throws RPCException {
        System.out.println("Server:Received Request:" + ((UnitTestStruct1)rpcContext.getInput()).getIntType());
        ((UnitTestStruct1)rpcContext.getOutput()).setStringType(((UnitTestStruct1)rpcContext.getInput()).getStringType() + " back");
        ((UnitTestStruct1)rpcContext.getOutput()).setIntType(((UnitTestStruct1)rpcContext.getInput()).getIntType());
        rpcContext.completeRequest();
    }

    @Test
    public void testServerRPC() throws Exception {
        final EventLoop eventLoop = new EventLoop();
        eventLoop.start();
        RPCTestServer server = new RPCTestServer();
        InetSocketAddress localAddress = new InetSocketAddress("localhost", 0);
        InetSocketAddress address = new InetSocketAddress("localhost", 9000);
        RPCServerChannel channel = new RPCServerChannel(server, eventLoop, address, null);
        server.bindActor(channel, RPCTestService.spec, server, null);
        server.start();
        RPCChannel clientChannel = new RPCChannel(eventLoop, null, localAddress, address, null);
        clientChannel.open();
        RPCTestService.AsyncStub stub = new RPCTestService.AsyncStub(clientChannel, eventLoop);
        UnitTestStruct1 input = new UnitTestStruct1();
        for (int i = 0; i < 1000; ++i) {
            input.setStringType("hello" + Integer.toString(i));
            input.setIntType(i);
            System.out.println("Sending Request:" + i);
            stub.hello(input, new OutgoingMessageContext.Callback<UnitTestStruct1, UnitTestStruct1>(){

                @Override
                public void requestComplete(OutgoingMessageContext<UnitTestStruct1, UnitTestStruct1> request) {
                    System.out.println("Request returned with status:" + request.getStatus().toString());
                    if (request.getStatus() == MessageData.Status.Success) {
                        System.out.println("Returned string value is:" + ((UnitTestStruct1)request.getOutput()).getStringType());
                        if (((UnitTestStruct1)request.getOutput()).getIntType() == 999) {
                            System.out.println("Got Final Response. Stopping Event Loop from within Callback");
                            eventLoop.stop();
                        }
                    }
                }
            });
            System.out.println("Sent Request:" + i);
        }
        eventLoop.getEventThread().join();
    }

    public static void main(String[] args) {
        RPCTestServer server = new RPCTestServer();
        try {
            server.testServerRPC();
        }
        catch (Exception e2) {
            e2.printStackTrace();
        }
        try {
            final EventLoop outerEventLoop = new EventLoop();
            outerEventLoop.start();
            final ThreadPoolExecutor targetExecutor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
            final ThreadPoolExecutor sourceExecutor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
            outerEventLoop.queueAsyncCallback(new Callbacks.Callback(){

                public void execute() {
                    try {
                        InProcessActor localActor = RPCTestService.InProcessActorFactory.createInProcessActor(new RPCTestService(){
                            int responseCount = 0;

                            @Override
                            public void hello(final IncomingMessageContext<UnitTestStruct1, UnitTestStruct1> rpcContext) throws RPCException {
                                System.out.println("Actor Received Hello for Message:" + ((UnitTestStruct1)rpcContext.getInput()).getIntType() + " Thread:" + Thread.currentThread().getId());
                                outerEventLoop.setTimer(new Timer((long)(Math.random() * 10.0), false, new Timer.Callback(){

                                    public void timerFired(Timer timer) {
                                        System.out.println("Actor Processing Delayed Hello Response for Message:" + ((UnitTestStruct1)rpcContext.getInput()).getIntType() + " Thread:" + Thread.currentThread().getId());
                                        ++responseCount;
                                        ((UnitTestStruct1)rpcContext.getOutput()).setIntType(((UnitTestStruct1)rpcContext.getInput()).getIntType());
                                        if (responseCount == 100) {
                                            System.out.println("Hit 100 Responses. Killing Actor Thread");
                                            ((UnitTestStruct1)rpcContext.getOutput()).setLongType(1L);
                                            ((InProcessActor.InProcessChannel)rpcContext.getChannel()).getActor().stop();
                                        }
                                        try {
                                            rpcContext.completeRequest();
                                        }
                                        catch (RPCException e) {
                                            e.printStackTrace();
                                        }
                                    }
                                }));
                            }
                        }, targetExecutor, new InProcessActor.Events(){

                            public void onStartup(InProcessActor actor) {
                                System.out.println("OnStartup - ThreadId:" + Thread.currentThread().getId());
                            }

                            public void onShutdown(InProcessActor actor) {
                                System.out.println("OnShutdown- ThreadId:" + Thread.currentThread().getId());
                            }
                        });
                        for (int i = 0; i < 1000; ++i) {
                            Channel localChannel = null;
                            localChannel = i % 2 == 0 ? localActor.createChannel(outerEventLoop) : localActor.createChannel(sourceExecutor);
                            RPCTestService.AsyncStub stub = new RPCTestService.AsyncStub(localChannel, outerEventLoop);
                            UnitTestStruct1 struct1 = new UnitTestStruct1();
                            struct1.setIntType(i);
                            System.out.println("Sending Request:" + struct1.getIntType() + " From Thread:" + Thread.currentThread().getId());
                            stub.hello(struct1, new OutgoingMessageContext.Callback<UnitTestStruct1, UnitTestStruct1>(){

                                @Override
                                public void requestComplete(OutgoingMessageContext<UnitTestStruct1, UnitTestStruct1> request) {
                                    System.out.println("Received Request Complete for Request:" + request.getRequestId() + " Thread:" + Thread.currentThread().getId());
                                    if (((UnitTestStruct1)request.getOutput()).getLongType() == 1L) {
                                        System.out.println("Shutdown Cmd Received");
                                        outerEventLoop.queueAsyncCallback(new Callbacks.Callback(){

                                            public void execute() {
                                                System.out.println("Killing Outer Event Loop - Thread:" + Thread.currentThread().getId());
                                                outerEventLoop.stop();
                                                targetExecutor.shutdown();
                                                sourceExecutor.shutdown();
                                            }
                                        });
                                    }
                                }
                            });
                        }
                        localActor.start();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }
}

