/*
 * Decompiled with CFR 0.152.
 */
package com.inet.persistence.azure.cosmos;

import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.ExcludedPath;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.IndexingPolicy;
import com.inet.annotations.JsonData;
import com.inet.id.GUID;
import com.inet.lib.json.Json;
import com.inet.logging.LogID;
import com.inet.persistence.PersistenceListener;
import com.inet.persistence.azure.cosmos.AzureCosmosPersistence;
import com.inet.persistence.spi.PersistenceLogger;
import com.inet.persistence.spi.events.PersistenceListenerContainer;
import java.util.Arrays;
import java.util.Objects;
import javax.annotation.Nonnull;

class AzureCosmosPublishSubscribe {
    private static final String CHANNEL = "inet-events";
    private final PersistenceListenerContainer listeners;
    private CosmosContainer collection;
    private boolean stopped;

    AzureCosmosPublishSubscribe(PersistenceListenerContainer listeners) {
        this.listeners = listeners;
    }

    void start(@Nonnull String clientID) {
        CosmosContainerProperties props = new CosmosContainerProperties(CHANNEL, "/type");
        props.setDefaultTimeToLiveInSeconds(Integer.valueOf(60));
        IndexingPolicy indexing = new IndexingPolicy();
        indexing.setExcludedPaths(Arrays.asList(new ExcludedPath("/*")));
        props.setIndexingPolicy(indexing);
        this.collection = AzureCosmosPersistence.getOrCreateContainer(props);
        Thread thread = new Thread(() -> {
            block4: {
                LogID.setID((String)"pubsub");
                String[] continuation = new String[1];
                try {
                    while (!this.stopped) {
                        CosmosChangeFeedRequestOptions options = continuation[0] == null ? CosmosChangeFeedRequestOptions.createForProcessingFromNow((FeedRange)FeedRange.forFullRange()) : CosmosChangeFeedRequestOptions.createForProcessingFromContinuation((String)continuation[0]);
                        for (EventPOJO event : this.collection.queryChangeFeed(options, EventPOJO.class).handle(response -> {
                            continuation[0] = response.getContinuationToken();
                        })) {
                            if (Objects.equals(event.client, clientID)) continue;
                            this.onMessage(event);
                        }
                        Thread.sleep(1000L);
                    }
                }
                catch (Throwable th) {
                    if (this.stopped) break block4;
                    PersistenceLogger.LOGGER.error(th);
                }
            }
        }, CHANNEL);
        thread.setDaemon(true);
        thread.start();
    }

    void stop() {
        this.stopped = true;
    }

    private void onMessage(EventPOJO evt) {
        String type = evt.type;
        if (type == null) {
            return;
        }
        PersistenceListenerContainer.ListenerDescription desc = this.listeners.get(type);
        if (desc == null) {
            return;
        }
        String message = evt.message;
        if (message == null) {
            return;
        }
        Object event = new Json().fromJson(message, desc.getType());
        for (PersistenceListener listener : desc) {
            listener.eventReceived(event);
        }
    }

    <T> void send(T event, String clientID) {
        EventPOJO evt = new EventPOJO();
        evt.client = clientID;
        evt.type = event.getClass().getName();
        evt.message = new Json().toJson(event);
        this.collection.createItem((Object)evt);
    }

    @JsonData
    private static class EventPOJO {
        public String id = GUID.generateNew().toString();
        public String client;
        public String type;
        public String message;

        private EventPOJO() {
        }
    }
}

