/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.publisher.QueueCacheSeeder;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class RangePoller {
    public static final int DEFAULT_SEED_DELAY_SECONDS = 30;
    private static final Logger LOG = LoggerFactory.getLogger(RangePoller.class);
    private final long maxOffset;
    private final long minOffset;
    private final Closeable headPoller;
    private final CountDownLatch fetched = new CountDownLatch(1);
    private final List<FullMessage<PackageMessage>> messages;
    private final int seedDelaySeconds;
    private final MessageSender<PackageMessage> sender;

    public RangePoller(MessagingProvider messagingProvider, String packageTopic, long minOffset, long maxOffsetExclusive, int seedDelaySeconds) {
        this.maxOffset = maxOffsetExclusive;
        this.minOffset = minOffset;
        this.seedDelaySeconds = seedDelaySeconds;
        this.messages = new ArrayList<FullMessage<PackageMessage>>();
        String assign = messagingProvider.assignTo(minOffset);
        LOG.info("Fetching offsets [{},{}[", (Object)minOffset, (Object)maxOffsetExclusive);
        this.sender = messagingProvider.createSender(packageTopic);
        this.headPoller = messagingProvider.createPoller(packageTopic, Reset.earliest, assign, new HandlerAdapter[]{HandlerAdapter.create(PackageMessage.class, this::handlePackage)});
    }

    public List<FullMessage<PackageMessage>> fetchRange() throws InterruptedException {
        try {
            if (!this.fetched.await(this.seedDelaySeconds, TimeUnit.SECONDS)) {
                LOG.warn("Unable to find a message with offset >= maxOffset={}. Sending single seeding message.", (Object)this.maxOffset);
                PackageMessage msg = QueueCacheSeeder.createTestMessage();
                this.sender.send((Object)msg);
                this.fetched.await();
            }
            LOG.info("Fetched offsets [{},{}[", (Object)this.minOffset, (Object)this.maxOffset);
            List<FullMessage<PackageMessage>> list = this.messages;
            return list;
        }
        finally {
            IOUtils.closeQuietly((Closeable)this.headPoller);
        }
    }

    private void handlePackage(MessageInfo info, PackageMessage message) {
        long offset = info.getOffset();
        LOG.debug("Consuming distribution package {} at offset={}", (Object)message, (Object)offset);
        if (offset < this.maxOffset) {
            if (this.isNotTestMessage(message)) {
                this.messages.add((FullMessage<PackageMessage>)new FullMessage(info, (Object)message));
            }
        } else {
            this.fetched.countDown();
        }
    }

    private boolean isNotTestMessage(PackageMessage message) {
        return message.getReqType() != PackageMessage.ReqType.TEST;
    }
}

