package threads.magnet.peerexchange;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import threads.magnet.IConsumers;
import threads.magnet.event.EventSource;
import threads.magnet.event.PeerConnectedEvent;
import threads.magnet.event.PeerDisconnectedEvent;
import threads.magnet.metainfo.TorrentId;
import threads.magnet.net.ConnectionKey;
import threads.magnet.peer.PeerRegistry$$ExternalSyntheticLambda5;
import threads.magnet.peer.PeerSource;
import threads.magnet.peer.PeerSourceFactory;
import threads.magnet.peerexchange.PeerExchangePeerSourceFactory;
import threads.magnet.protocol.Message;
import threads.magnet.protocol.extended.ExtendedHandshake;
import threads.magnet.service.RuntimeLifecycleBinder;
import threads.magnet.torrent.MessageConsumer;
import threads.magnet.torrent.MessageContext;

/* loaded from: classes3.dex */
public class PeerExchangePeerSourceFactory implements PeerSourceFactory, IConsumers {
    private static final Duration CLEANER_INTERVAL = Duration.ofSeconds(37);
    private final Map<TorrentId, PeerExchangePeerSource> peerSources = new ConcurrentHashMap();
    private final Map<TorrentId, Queue<PeerEvent>> peerEvents = new ConcurrentHashMap();
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Set<ConnectionKey> peers = ConcurrentHashMap.newKeySet();
    private final Map<ConnectionKey, Long> lastSentPEXMessage = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public class Cleaner implements Runnable {
        private Cleaner() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static /* synthetic */ Long lambda$run$0(Long l, Long l2) {
            return l.longValue() < l2.longValue() ? l : l2;
        }

        @Override // java.lang.Runnable
        public void run() {
            PeerExchangePeerSourceFactory.this.rwLock.writeLock().lock();
            try {
                long longValue = ((Long) PeerExchangePeerSourceFactory.this.lastSentPEXMessage.values().stream().reduce(Long.MAX_VALUE, new BinaryOperator() { // from class: threads.magnet.peerexchange.PeerExchangePeerSourceFactory$Cleaner$$ExternalSyntheticLambda0
                    @Override // java.util.function.BiFunction
                    public final Object apply(Object obj, Object obj2) {
                        return PeerExchangePeerSourceFactory.Cleaner.lambda$run$0((Long) obj, (Long) obj2);
                    }
                })).longValue();
                for (Queue queue : PeerExchangePeerSourceFactory.this.peerEvents.values()) {
                    while (true) {
                        PeerEvent peerEvent = (PeerEvent) queue.peek();
                        if (peerEvent != null && peerEvent.getInstant() <= longValue) {
                            queue.poll();
                        }
                    }
                }
            } finally {
                PeerExchangePeerSourceFactory.this.rwLock.writeLock().unlock();
            }
        }
    }

    public PeerExchangePeerSourceFactory(EventSource eventSource, RuntimeLifecycleBinder runtimeLifecycleBinder) {
        if (PeerExchangeConfig.getMaxMessageInterval().compareTo(PeerExchangeConfig.getMinMessageInterval()) < 0) {
            throw new IllegalArgumentException("Max message interval is greater than min interval");
        }
        eventSource.onPeerConnected(new Consumer() { // from class: threads.magnet.peerexchange.PeerExchangePeerSourceFactory$$ExternalSyntheticLambda0
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                PeerExchangePeerSourceFactory.this.lambda$new$0((PeerConnectedEvent) obj);
            }
        }).onPeerDisconnected(new Consumer() { // from class: threads.magnet.peerexchange.PeerExchangePeerSourceFactory$$ExternalSyntheticLambda1
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                PeerExchangePeerSourceFactory.this.lambda$new$1((PeerDisconnectedEvent) obj);
            }
        });
        final ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: threads.magnet.peerexchange.PeerExchangePeerSourceFactory$$ExternalSyntheticLambda2
            @Override // java.util.concurrent.ThreadFactory
            public final Thread newThread(Runnable runnable) {
                return PeerExchangePeerSourceFactory.lambda$new$2(runnable);
            }
        });
        runtimeLifecycleBinder.onStartup("Schedule periodic cleanup of PEX messages", new Runnable() { // from class: threads.magnet.peerexchange.PeerExchangePeerSourceFactory$$ExternalSyntheticLambda3
            @Override // java.lang.Runnable
            public final void run() {
                PeerExchangePeerSourceFactory.this.lambda$new$3(newSingleThreadScheduledExecutor);
            }
        });
        Objects.requireNonNull(newSingleThreadScheduledExecutor);
        runtimeLifecycleBinder.onShutdown("Shutdown PEX cleanup scheduler", new PeerRegistry$$ExternalSyntheticLambda5(newSingleThreadScheduledExecutor));
    }

    private void consume(PeerExchange peerExchange, MessageContext messageContext) {
        getOrCreatePeerSource(messageContext.getTorrentId()).addMessage(peerExchange);
    }

    private void consume(ExtendedHandshake extendedHandshake, MessageContext messageContext) {
        if (extendedHandshake.getSupportedMessageTypes().contains("ut_pex")) {
            this.peers.add(messageContext.getConnectionKey());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doConsume(Message message, MessageContext messageContext) {
        if (message instanceof ExtendedHandshake) {
            consume((ExtendedHandshake) message, messageContext);
        }
        if (message instanceof PeerExchange) {
            consume((PeerExchange) message, messageContext);
        }
    }

    private PeerExchangePeerSource getOrCreatePeerSource(TorrentId torrentId) {
        PeerExchangePeerSource peerExchangePeerSource = this.peerSources.get(torrentId);
        if (peerExchangePeerSource != null) {
            return peerExchangePeerSource;
        }
        PeerExchangePeerSource peerExchangePeerSource2 = new PeerExchangePeerSource();
        PeerExchangePeerSource putIfAbsent = this.peerSources.putIfAbsent(torrentId, peerExchangePeerSource2);
        return putIfAbsent != null ? putIfAbsent : peerExchangePeerSource2;
    }

    private Queue<PeerEvent> getPeerEvents(TorrentId torrentId) {
        Queue<PeerEvent> queue = this.peerEvents.get(torrentId);
        if (queue != null) {
            return queue;
        }
        PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
        Queue<PeerEvent> putIfAbsent = this.peerEvents.putIfAbsent(torrentId, priorityBlockingQueue);
        return putIfAbsent != null ? putIfAbsent : priorityBlockingQueue;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$new$0(PeerConnectedEvent peerConnectedEvent) {
        onPeerConnected(peerConnectedEvent.getConnectionKey());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$new$1(PeerDisconnectedEvent peerDisconnectedEvent) {
        onPeerDisconnected(peerDisconnectedEvent.getConnectionKey());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ Thread lambda$new$2(Runnable runnable) {
        return new Thread(runnable, "bt.peerexchange.cleaner");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$new$3(ScheduledExecutorService scheduledExecutorService) {
        Cleaner cleaner = new Cleaner();
        Duration duration = CLEANER_INTERVAL;
        scheduledExecutorService.scheduleWithFixedDelay(cleaner, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void onPeerConnected(ConnectionKey connectionKey) {
        getPeerEvents(connectionKey.getTorrentId()).add(PeerEvent.added(connectionKey.getPeer()));
    }

    private void onPeerDisconnected(ConnectionKey connectionKey) {
        getPeerEvents(connectionKey.getTorrentId()).add(PeerEvent.dropped(connectionKey.getPeer()));
        this.peers.remove(connectionKey);
        this.lastSentPEXMessage.remove(connectionKey);
    }

    @Override // threads.magnet.IConsumers
    public List<MessageConsumer<? extends Message>> getConsumers() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MessageConsumer<PeerExchange>() { // from class: threads.magnet.peerexchange.PeerExchangePeerSourceFactory.1
            @Override // threads.magnet.torrent.MessageConsumer
            public void consume(PeerExchange peerExchange, MessageContext messageContext) {
                PeerExchangePeerSourceFactory.this.doConsume(peerExchange, messageContext);
            }

            @Override // threads.magnet.torrent.MessageConsumer
            public Class<PeerExchange> getConsumedType() {
                return PeerExchange.class;
            }
        });
        arrayList.add(new MessageConsumer<ExtendedHandshake>() { // from class: threads.magnet.peerexchange.PeerExchangePeerSourceFactory.2
            @Override // threads.magnet.torrent.MessageConsumer
            public void consume(ExtendedHandshake extendedHandshake, MessageContext messageContext) {
                PeerExchangePeerSourceFactory.this.doConsume(extendedHandshake, messageContext);
            }

            @Override // threads.magnet.torrent.MessageConsumer
            public Class<ExtendedHandshake> getConsumedType() {
                return ExtendedHandshake.class;
            }
        });
        return arrayList;
    }

    @Override // threads.magnet.peer.PeerSourceFactory
    public PeerSource getPeerSource(TorrentId torrentId) {
        return getOrCreatePeerSource(torrentId);
    }
}
