package com.atlassian.jira.plugins.dvcs.service;

import com.atlassian.beehive.ClusterLock;
import com.atlassian.beehive.ClusterLockService;
import com.atlassian.beehive.compat.ClusterLockServiceFactory;
import com.atlassian.jira.plugins.dvcs.event.RepositorySync;
import com.atlassian.jira.plugins.dvcs.event.RepositorySyncHelper;
import com.atlassian.jira.plugins.dvcs.model.DiscardReason;
import com.atlassian.jira.plugins.dvcs.model.Message;
import com.atlassian.jira.plugins.dvcs.model.Progress;
import com.atlassian.jira.plugins.dvcs.model.Repository;
import com.atlassian.jira.plugins.dvcs.service.message.AbstractMessagePayloadSerializer;
import com.atlassian.jira.plugins.dvcs.service.message.HasProgress;
import com.atlassian.jira.plugins.dvcs.service.message.MessageConsumer;
import com.atlassian.jira.plugins.dvcs.service.message.MessagingService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/atlassian/jira/plugins/dvcs/service/MessageExecutor.class */
public class MessageExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumer.class);
    private static final String PROCESS_MESSAGE_LOCK = MessageExecutor.class.getName() + ".processMessage";
    private final ExecutorService executor;
    private ClusterLockService clusterLockService;

    @Resource
    private ClusterLockServiceFactory clusterLockServiceFactory;

    @Resource
    private MessagingService messagingService;

    @Resource
    private MessageConsumer<?>[] consumers;

    @Resource
    private RepositorySyncHelper repoSyncHelper;
    private final ConcurrentMap<String, List<MessageConsumer<?>>> messageAddressToConsumers;
    private final ConcurrentMap<MessageConsumer<?>, AtomicInteger> consumerToRemainingTokens;
    private volatile boolean stop;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/atlassian/jira/plugins/dvcs/service/MessageExecutor$MessageRunnable.class */
    public final class MessageRunnable<P extends HasProgress> extends ReleaseTokenAndEnqueueNextMessage {
        private final Message<P> message;
        private final MessageConsumer<P> consumer;

        public MessageRunnable(Message<P> message, MessageConsumer<P> messageConsumer) {
            super();
            this.message = message;
            this.consumer = messageConsumer;
        }

        @Override // com.atlassian.jira.plugins.dvcs.service.MessageExecutor.ReleaseTokenAndEnqueueNextMessage
        public MessageConsumer<P> getConsumer() {
            return this.consumer;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // com.atlassian.jira.plugins.dvcs.service.MessageExecutor.ReleaseTokenAndEnqueueNextMessage
        protected void doRun() {
            try {
                HasProgress deserializePayload = MessageExecutor.this.messagingService.deserializePayload(this.message);
                Progress progress = deserializePayload.getProgress();
                Repository repositoryFromMessage = MessageExecutor.this.messagingService.getRepositoryFromMessage(this.message);
                RepositorySync startSync = MessageExecutor.this.repoSyncHelper.startSync(repositoryFromMessage, progress.isSoftsync() && deserializePayload.isSoftSync());
                try {
                    try {
                        this.consumer.onReceive(this.message, deserializePayload);
                        MessageExecutor.this.messagingService.ok(this.consumer, this.message);
                        startSync.finish();
                        tryEndProgress(repositoryFromMessage, this.message, this.consumer, progress);
                    } catch (Throwable th) {
                        MessageExecutor.LOGGER.error("Synchronization failed: " + th.getMessage(), th);
                        MessageExecutor.this.messagingService.fail(this.consumer, this.message, th);
                        if (this.message.getRetriesCount() >= 3) {
                            MessageExecutor.this.messagingService.discard(this.consumer, this.message, DiscardReason.RETRY_COUNT_EXCEEDED);
                        }
                        progress.setError("Error during sync. See server logs.");
                        Throwables.propagateIfInstanceOf(th, Error.class);
                        startSync.finish();
                        tryEndProgress(repositoryFromMessage, this.message, this.consumer, progress);
                    }
                } catch (Throwable th2) {
                    startSync.finish();
                    tryEndProgress(repositoryFromMessage, this.message, this.consumer, progress);
                    throw th2;
                }
            } catch (AbstractMessagePayloadSerializer.MessageDeserializationException e) {
                MessageExecutor.this.messagingService.discard(this.consumer, this.message, DiscardReason.FAILED_DESERIALIZATION);
                throw e;
            }
        }

        protected void tryEndProgress(Repository repository, Message<P> message, MessageConsumer<P> messageConsumer, Progress progress) {
            if (repository != null) {
                try {
                    MessageExecutor.this.messagingService.tryEndProgress(repository, progress, messageConsumer, MessageExecutor.this.messagingService.getSynchronizationAuditIdFromTags(message.getTags()));
                } catch (RuntimeException e) {
                    MessageExecutor.LOGGER.error(e.getMessage(), e);
                }
            }
        }
    }

    /* loaded from: input_file:com/atlassian/jira/plugins/dvcs/service/MessageExecutor$ReleaseTokenAndEnqueueNextMessage.class */
    private abstract class ReleaseTokenAndEnqueueNextMessage implements Runnable {
        private ReleaseTokenAndEnqueueNextMessage() {
        }

        @Override // java.lang.Runnable
        public final void run() {
            try {
                doRun();
                MessageConsumer<?> consumer = getConsumer();
                MessageExecutor.this.releaseToken(consumer);
                MessageExecutor.this.tryToProcessNextMessage(consumer);
            } catch (Throwable th) {
                MessageConsumer<?> consumer2 = getConsumer();
                MessageExecutor.this.releaseToken(consumer2);
                MessageExecutor.this.tryToProcessNextMessage(consumer2);
                throw th;
            }
        }

        protected abstract void doRun();

        protected abstract MessageConsumer<?> getConsumer();
    }

    public MessageExecutor() {
        this(createThreadPoolExecutor());
    }

    @VisibleForTesting
    public MessageExecutor(@Nonnull ExecutorService executorService) {
        this.messageAddressToConsumers = new ConcurrentHashMap();
        this.consumerToRemainingTokens = new ConcurrentHashMap();
        this.executor = (ExecutorService) Preconditions.checkNotNull(executorService, "executor");
    }

    @PostConstruct
    public void init() {
        this.clusterLockService = this.clusterLockServiceFactory.getClusterLockService();
        for (MessageConsumer<?> messageConsumer : this.consumers) {
            List<MessageConsumer<?>> list = this.messageAddressToConsumers.get(messageConsumer.getAddress().getId());
            if (list == null) {
                ConcurrentMap<String, List<MessageConsumer<?>>> concurrentMap = this.messageAddressToConsumers;
                String id = messageConsumer.getAddress().getId();
                CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
                list = copyOnWriteArrayList;
                concurrentMap.putIfAbsent(id, copyOnWriteArrayList);
            }
            list.add(messageConsumer);
            this.consumerToRemainingTokens.put(messageConsumer, new AtomicInteger(messageConsumer.getParallelThreads()));
        }
    }

    @PreDestroy
    public void destroy() throws Exception {
        this.stop = true;
        this.executor.shutdownNow();
        if (this.executor.awaitTermination(1L, TimeUnit.MINUTES)) {
            return;
        }
        LOGGER.error("Unable properly shutdown message queue.");
    }

    public void notify(String str) {
        Iterator<MessageConsumer<?>> it = this.messageAddressToConsumers.get(str).iterator();
        while (it.hasNext()) {
            tryToProcessNextMessage(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <P extends HasProgress> void tryToProcessNextMessage(MessageConsumer<P> messageConsumer) {
        if (this.stop) {
            return;
        }
        ClusterLock lockForName = this.clusterLockService.getLockForName(PROCESS_MESSAGE_LOCK);
        lockForName.lock();
        try {
            Message<P> nextMessageForConsuming = this.messagingService.getNextMessageForConsuming(messageConsumer, messageConsumer.getAddress().getId());
            if (nextMessageForConsuming == null) {
                return;
            }
            if (!acquireToken(messageConsumer)) {
                lockForName.unlock();
                return;
            }
            this.messagingService.running(messageConsumer, nextMessageForConsuming);
            lockForName.unlock();
            this.executor.execute(new MessageRunnable(nextMessageForConsuming, messageConsumer));
        } finally {
            lockForName.unlock();
        }
    }

    private <P extends HasProgress> boolean acquireToken(MessageConsumer<P> messageConsumer) {
        int i;
        AtomicInteger atomicInteger = this.consumerToRemainingTokens.get(messageConsumer);
        do {
            i = atomicInteger.get();
            if (i <= 0) {
                return false;
            }
        } while (!atomicInteger.compareAndSet(i, i - 1));
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <P extends HasProgress> void releaseToken(MessageConsumer<P> messageConsumer) {
        this.consumerToRemainingTokens.get(messageConsumer).incrementAndGet();
    }

    private static ThreadPoolExecutor createThreadPoolExecutor() {
        return new ThreadPoolExecutor(1, Integer.MAX_VALUE, 5L, TimeUnit.MINUTES, new LinkedBlockingQueue());
    }
}
