package threads.magnet.torrent;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import threads.magnet.IConsumers;
import threads.magnet.IProduces;
import threads.magnet.LogUtils;
import threads.magnet.metainfo.TorrentId;
import threads.magnet.net.Peer;
import threads.magnet.protocol.InvalidMessageException;
import threads.magnet.protocol.Message;
import threads.magnet.protocol.Piece;
import threads.magnet.protocol.Request;

/* loaded from: classes3.dex */
public final class PeerRequestConsumer implements IProduces, IConsumers {
    private final Map<Peer, Queue<BlockRead>> completedRequests = new ConcurrentHashMap();
    private final DataWorker dataWorker;
    private final TorrentId torrentId;

    public PeerRequestConsumer(TorrentId torrentId, DataWorker dataWorker) {
        this.torrentId = torrentId;
        this.dataWorker = dataWorker;
    }

    private CompletableFuture<BlockRead> addBlockRequest(Peer peer, Request request) {
        return this.dataWorker.addBlockRequest(this.torrentId, peer, request.pieceIndex(), request.offset(), request.length());
    }

    private void consume(Request request, final MessageContext messageContext) {
        if (messageContext.getConnectionState().isChoking()) {
            return;
        }
        addBlockRequest(messageContext.getPeer(), request).whenComplete(new BiConsumer() { // from class: threads.magnet.torrent.PeerRequestConsumer$$ExternalSyntheticLambda0
            @Override // java.util.function.BiConsumer
            public final void accept(Object obj, Object obj2) {
                PeerRequestConsumer.this.lambda$consume$0(messageContext, (BlockRead) obj, (Throwable) obj2);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doConsume(Message message, MessageContext messageContext) {
        if (message instanceof Request) {
            consume((Request) message, messageContext);
        }
    }

    private Queue<BlockRead> getCompletedRequestsForPeer(Peer peer) {
        Queue<BlockRead> queue = this.completedRequests.get(peer);
        if (queue != null) {
            return queue;
        }
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Queue<BlockRead> putIfAbsent = this.completedRequests.putIfAbsent(peer, concurrentLinkedQueue);
        return putIfAbsent != null ? putIfAbsent : concurrentLinkedQueue;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$consume$0(MessageContext messageContext, BlockRead blockRead, Throwable th) {
        if (th != null) {
            LogUtils.warning(LogUtils.TAG, th.getMessage());
            return;
        }
        if (blockRead.getError().isPresent()) {
            LogUtils.warning(LogUtils.TAG, blockRead.getError().get().getMessage());
        } else if (blockRead.isRejected()) {
            LogUtils.warning(LogUtils.TAG, "Block rejected");
        } else {
            getCompletedRequestsForPeer(messageContext.getPeer()).add(blockRead);
        }
    }

    @Override // threads.magnet.IConsumers
    public List<MessageConsumer<? extends Message>> getConsumers() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MessageConsumer<Request>() { // from class: threads.magnet.torrent.PeerRequestConsumer.1
            @Override // threads.magnet.torrent.MessageConsumer
            public void consume(Request request, MessageContext messageContext) {
                PeerRequestConsumer.this.doConsume(request, messageContext);
            }

            @Override // threads.magnet.torrent.MessageConsumer
            public Class<Request> getConsumedType() {
                return Request.class;
            }
        });
        return arrayList;
    }

    @Override // threads.magnet.IProduces
    public void produce(Consumer<Message> consumer, MessageContext messageContext) {
        Queue<BlockRead> completedRequestsForPeer = getCompletedRequestsForPeer(messageContext.getPeer());
        while (true) {
            BlockRead poll = completedRequestsForPeer.poll();
            if (poll == null) {
                return;
            }
            try {
                consumer.accept(Piece.create(poll.getPieceIndex(), poll.getOffset(), poll.getLength(), poll.getReader()));
            } catch (InvalidMessageException e) {
                throw new RuntimeException("Failed to send PIECE", e);
            }
        }
    }
}
