package net.derkholm.nmica.utils.mq;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:net/derkholm/nmica/utils/mq/MessageQueue.class */
public class MessageQueue<T> {
    private static final int STATE_NEW = 0;
    private static final int STATE_RUNNING = 1;
    private static final int STATE_DRAINING = 2;
    private static final int STATE_DEAD = 3;
    private int state;
    private DatagramChannel channel;
    private InetSocketAddress endpoint;
    private Selector selector;
    private SelectionKey key;
    private MessageCodec<T> codec;
    private ConcurrentMap<SocketAddress, Peer> peers;
    private Peer[] peerArray;
    private int peerRoundRobinCounter;
    private ConcurrentMap<Peer, BlockingQueue<T>> outQbyPeer;
    private Queue<Message<T>> inQ;
    private Semaphore inQnonEmpty;
    private int packetSize;
    private int flushQueueLength;
    private int txBytes;
    private int txPackets;
    private int rxBytes;
    private int rxPackets;
    private boolean currentlyInWriteMode;

    /* loaded from: input_file:net/derkholm/nmica/utils/mq/MessageQueue$Message.class */
    public static final class Message<T> {
        private final Peer sender;
        private final T body;

        private Message(Peer peer, T t) {
            this.sender = peer;
            this.body = t;
        }

        public Peer getSender() {
            return this.sender;
        }

        public T getBody() {
            return this.body;
        }
    }

    /* loaded from: input_file:net/derkholm/nmica/utils/mq/MessageQueue$Peer.class */
    public static final class Peer {
        private final SocketAddress endpoint;
        private Object userData;

        private Peer(SocketAddress socketAddress) {
            this.endpoint = socketAddress;
        }

        public SocketAddress getEndpoint() {
            return this.endpoint;
        }

        public void setUserData(Object obj) {
            this.userData = obj;
        }

        public Object getUserData() {
            return this.userData;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/derkholm/nmica/utils/mq/MessageQueue$QMon.class */
    public class QMon extends Thread {
        private QMon() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                synchronized (MessageQueue.this) {
                    if (MessageQueue.this.state != 0) {
                        throw new IllegalStateException();
                    }
                    MessageQueue.this.state = MessageQueue.STATE_RUNNING;
                }
                ByteBuffer allocateDirect = ByteBuffer.allocateDirect(65536);
                ByteBuffer allocateDirect2 = ByteBuffer.allocateDirect(65536);
                allocateDirect.order(ByteOrder.LITTLE_ENDIAN);
                allocateDirect2.order(ByteOrder.LITTLE_ENDIAN);
                Peer peer = MessageQueue.STATE_NEW;
                while (true) {
                    boolean z = MessageQueue.STATE_NEW;
                    boolean z2 = MessageQueue.this.state == MessageQueue.STATE_DRAINING;
                    Iterator it = MessageQueue.this.outQbyPeer.values().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        } else if (!((BlockingQueue) it.next()).isEmpty()) {
                            z = MessageQueue.STATE_RUNNING;
                            break;
                        }
                    }
                    if (!z && peer == null && z2) {
                        return;
                    }
                    if (z || peer != null) {
                        MessageQueue.this.key.interestOps(5);
                        MessageQueue.this.currentlyInWriteMode = true;
                    } else {
                        MessageQueue.this.key.interestOps(MessageQueue.STATE_RUNNING);
                        MessageQueue.this.currentlyInWriteMode = false;
                    }
                    if (MessageQueue.this.selector.select(100L) != 0) {
                        Iterator<SelectionKey> it2 = MessageQueue.this.selector.selectedKeys().iterator();
                        while (it2.hasNext()) {
                            if (it2.next() == MessageQueue.this.key) {
                                if (MessageQueue.this.key.isWritable()) {
                                    int i = MessageQueue.STATE_NEW;
                                    int i2 = MessageQueue.STATE_NEW;
                                    if (peer != null) {
                                        MessageQueue.this.channel.send(allocateDirect2, peer.getEndpoint());
                                        peer = MessageQueue.STATE_NEW;
                                        i2 += MessageQueue.STATE_RUNNING;
                                    }
                                    synchronized (MessageQueue.this) {
                                        boolean z3 = MessageQueue.STATE_RUNNING;
                                        while (z3) {
                                            BlockingQueue blockingQueue = MessageQueue.STATE_NEW;
                                            int length = MessageQueue.this.peerArray.length;
                                            int i3 = MessageQueue.STATE_RUNNING;
                                            while (true) {
                                                if (i3 > length) {
                                                    break;
                                                }
                                                int i4 = MessageQueue.this.peerRoundRobinCounter + i3;
                                                while (i4 >= length) {
                                                    i4 -= length;
                                                }
                                                blockingQueue = (BlockingQueue) MessageQueue.this.outQbyPeer.get(MessageQueue.this.peerArray[i4]);
                                                if (blockingQueue != null && !blockingQueue.isEmpty()) {
                                                    peer = MessageQueue.this.peerArray[i4];
                                                    MessageQueue.this.peerRoundRobinCounter = i4;
                                                    break;
                                                }
                                                i3 += MessageQueue.STATE_RUNNING;
                                            }
                                            if (blockingQueue == null || blockingQueue.isEmpty()) {
                                                break;
                                            }
                                            i += MessageQueue.STATE_RUNNING;
                                            allocateDirect2.clear();
                                            int i5 = MessageQueue.STATE_NEW;
                                            while (true) {
                                                Object peek = blockingQueue.peek();
                                                if (peek == null) {
                                                    break;
                                                }
                                                int position = allocateDirect2.position() + MessageQueue.this.codec.sizeMessage(peek);
                                                if (i5 != 0 && position >= MessageQueue.this.packetSize) {
                                                    break;
                                                }
                                                MessageQueue.this.codec.writeMessage(allocateDirect2, peek);
                                                blockingQueue.remove();
                                                i5 += MessageQueue.STATE_RUNNING;
                                            }
                                            allocateDirect2.flip();
                                            int send = MessageQueue.this.channel.send(allocateDirect2, peer.getEndpoint());
                                            if (send > 0) {
                                                peer = MessageQueue.STATE_NEW;
                                                i2 += MessageQueue.STATE_RUNNING;
                                                MessageQueue.access$1204(MessageQueue.this);
                                                MessageQueue.access$1312(MessageQueue.this, send);
                                            } else {
                                                z3 = MessageQueue.STATE_NEW;
                                            }
                                        }
                                    }
                                }
                                if (MessageQueue.this.key.isReadable()) {
                                    ArrayList arrayList = new ArrayList();
                                    while (true) {
                                        allocateDirect.clear();
                                        SocketAddress receive = MessageQueue.this.channel.receive(allocateDirect);
                                        allocateDirect.flip();
                                        if (allocateDirect.limit() == 0) {
                                            break;
                                        }
                                        MessageQueue.access$1404(MessageQueue.this);
                                        MessageQueue.access$1512(MessageQueue.this, allocateDirect.limit());
                                        Peer peer2 = MessageQueue.this.getPeer(receive);
                                        while (allocateDirect.position() < allocateDirect.limit()) {
                                            arrayList.add(new Message(peer2, MessageQueue.this.codec.readMessage(allocateDirect)));
                                        }
                                    }
                                    synchronized (MessageQueue.this.inQ) {
                                        int size = MessageQueue.this.inQ.size();
                                        MessageQueue.this.inQ.addAll(arrayList);
                                        if (size == 0 && !arrayList.isEmpty()) {
                                            MessageQueue.this.inQnonEmpty.drainPermits();
                                            MessageQueue.this.inQnonEmpty.release();
                                        }
                                    }
                                }
                                it2.remove();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public int getTxBytes() {
        return this.txBytes;
    }

    public int getRxBytes() {
        return this.rxBytes;
    }

    public int getTxPackets() {
        return this.txPackets;
    }

    public int getRxPackets() {
        return this.rxPackets;
    }

    public void setCodec(MessageCodec<T> messageCodec) {
        this.codec = messageCodec;
    }

    public MessageCodec<T> getCodec() {
        return this.codec;
    }

    public void setFlushThreshold(int i) {
        this.flushQueueLength = i;
    }

    public MessageQueue(int i) throws IOException {
        this.state = STATE_NEW;
        this.peers = new ConcurrentHashMap();
        this.peerArray = new Peer[STATE_NEW];
        this.peerRoundRobinCounter = STATE_NEW;
        this.outQbyPeer = new ConcurrentHashMap();
        this.inQ = new LinkedList();
        this.inQnonEmpty = new Semaphore(STATE_NEW);
        this.packetSize = 7900;
        this.flushQueueLength = 20;
        this.txBytes = STATE_NEW;
        this.txPackets = STATE_NEW;
        this.rxBytes = STATE_NEW;
        this.rxPackets = STATE_NEW;
        this.currentlyInWriteMode = false;
        this.channel = DatagramChannel.open();
        this.channel.socket().bind(new InetSocketAddress(i));
        this.channel.configureBlocking(false);
        this.endpoint = new InetSocketAddress(InetAddress.getLocalHost(), this.channel.socket().getLocalPort());
        this.selector = Selector.open();
        this.key = this.channel.register(this.selector, STATE_RUNNING);
    }

    public MessageQueue() throws IOException {
        this(STATE_NEW);
    }

    public Peer getPeer(SocketAddress socketAddress) {
        Peer peer = this.peers.get(socketAddress);
        if (peer == null) {
            peer = new Peer(socketAddress);
            Peer putIfAbsent = this.peers.putIfAbsent(socketAddress, peer);
            if (putIfAbsent != null) {
                peer = putIfAbsent;
            } else {
                synchronized (this) {
                    this.peerArray = (Peer[]) this.peers.values().toArray(new Peer[STATE_NEW]);
                }
            }
        }
        return peer;
    }

    public synchronized void sendMessage(Peer peer, T t) throws QueueDeadException {
        if (this.state == 3) {
            throw new QueueDeadException();
        }
        BlockingQueue<T> blockingQueue = this.outQbyPeer.get(peer);
        if (blockingQueue == null) {
            blockingQueue = new LinkedBlockingQueue();
            BlockingQueue<T> putIfAbsent = this.outQbyPeer.putIfAbsent(peer, blockingQueue);
            if (putIfAbsent != null) {
                blockingQueue = putIfAbsent;
            }
        }
        while (true) {
            try {
                blockingQueue.put(t);
                if (blockingQueue.size() >= this.flushQueueLength) {
                    flush();
                    return;
                }
                return;
            } catch (InterruptedException e) {
            }
        }
    }

    public Message<T> next() throws QueueDeadException {
        while (true) {
            try {
                this.inQnonEmpty.acquire();
                synchronized (this.inQ) {
                    if (!this.inQ.isEmpty()) {
                        Message<T> remove = this.inQ.remove();
                        if (this.inQ.isEmpty()) {
                            flush();
                        } else {
                            this.inQnonEmpty.release();
                        }
                        return remove;
                    }
                    if (this.state == 3) {
                        this.inQnonEmpty.release();
                        throw new QueueDeadException();
                    }
                }
            } catch (InterruptedException e) {
            }
        }
    }

    public Message<T> next(long j) throws QueueDeadException, InterruptedException {
        Message<T> remove;
        if (!this.inQnonEmpty.tryAcquire(j, TimeUnit.MICROSECONDS)) {
            return null;
        }
        synchronized (this.inQ) {
            remove = this.inQ.remove();
            if (this.inQ.poll() != null) {
                this.inQnonEmpty.release();
            } else if (remove != null && this.state == 3) {
                this.inQnonEmpty.release();
                throw new QueueDeadException();
            }
        }
        return remove;
    }

    public Message<T> nextNonBlocking() throws QueueDeadException {
        Message<T> remove;
        if (!this.inQnonEmpty.tryAcquire()) {
            return null;
        }
        synchronized (this.inQ) {
            remove = this.inQ.remove();
            if (this.inQ.poll() != null) {
                this.inQnonEmpty.release();
            } else {
                flush();
            }
            if (remove == null && this.state == 3) {
                this.inQnonEmpty.release();
                throw new QueueDeadException();
            }
        }
        return remove;
    }

    public void next(Collection<Message<T>> collection) throws QueueDeadException {
        boolean z;
        loop0: while (true) {
            try {
                this.inQnonEmpty.acquire();
                synchronized (this.inQ) {
                    z = STATE_NEW;
                    while (!this.inQ.isEmpty()) {
                        collection.add(this.inQ.remove());
                        z = STATE_RUNNING;
                        continue;
                    }
                    break loop0;
                }
            } catch (InterruptedException e) {
            }
        }
        if (z || this.state != 3) {
            flush();
        } else {
            this.inQnonEmpty.release();
            throw new QueueDeadException();
        }
    }

    public void flush() {
        if (this.currentlyInWriteMode) {
            return;
        }
        this.selector.wakeup();
    }

    public synchronized void start() {
        if (this.state != 0) {
            throw new IllegalStateException("Only newly created MessageQueues can be started");
        }
        new QMon().start();
    }

    public synchronized void shutdown() {
        if (this.state != STATE_RUNNING) {
            throw new IllegalStateException("MessageQueue is not currently running, can't shutdown");
        }
        this.state = STATE_DRAINING;
    }

    public InetSocketAddress getEndpoint() {
        return this.endpoint;
    }

    static /* synthetic */ int access$1204(MessageQueue messageQueue) {
        int i = messageQueue.txPackets + STATE_RUNNING;
        messageQueue.txPackets = i;
        return i;
    }

    static /* synthetic */ int access$1312(MessageQueue messageQueue, int i) {
        int i2 = messageQueue.txBytes + i;
        messageQueue.txBytes = i2;
        return i2;
    }

    static /* synthetic */ int access$1404(MessageQueue messageQueue) {
        int i = messageQueue.rxPackets + STATE_RUNNING;
        messageQueue.rxPackets = i;
        return i;
    }

    static /* synthetic */ int access$1512(MessageQueue messageQueue, int i) {
        int i2 = messageQueue.rxBytes + i;
        messageQueue.rxBytes = i2;
        return i2;
    }
}
