import { RealmRegistry } from '../realm/create-realm-registry.js';
import { Queue } from '../queue/queue.js';
import { Lock } from '../lock/lock.js';
import { combineRunnable, Runnable } from '../queue/runnable.js';
import { createTopicSubscribeAndConsume } from './create-topic-subscribe-and-consume.js';
import { RuntimeEnv } from '../runtime-env.js';
import {
  aggregationAttachTopic,
  aggregationEventTopic,
  aggregationInitTopic,
  stateCommitTopic,
  streamEventTopic,
  streamInitTopic,
} from './aggregation-attach-topic.js';
import { createInstanceStorage } from './create-instance-storage.js';
import { applyRuntimeResultToInstance } from './apply-runtime-result-to-instance.js';
import { Management } from './management.js';
import { InstanceIdentifier } from './instance-identifier.js';
import { EventSource } from '../realm/event-source.js';
import { mapStateDefinition } from './map-state-declarations.js';
import { firstOfRange } from '../storage/first-of-range.js';
import { mapEventDefinition } from './map-event-declarations.js';
import { StorageBuilder } from '../storage/tracker/storage-builder.js';
import { createTenantStorage } from './create-tenant-storage.js';
import { createLazy } from './create-lazy.js';
import { createStreamEventStore } from './create-stream-interface.js';
import { createLockKey } from '../lock/create-lock-key.js';

export interface WorkerOptions {
  storageBuilder: StorageBuilder;
  registry: RealmRegistry;
  management: Management;
  name: string;
  queue: Queue;
  lock: Lock;
  signal: AbortSignal;
  env: RuntimeEnv;
}

export function createWorker(opts: WorkerOptions): Runnable<void> {
  const tenantStorage = createTenantStorage(opts.storageBuilder, opts.signal);

  const commitTopicLazy = createLazy(() =>
    opts.queue.createTopic(stateCommitTopic),
  );
  const aggEventTopicLazy = createLazy(() =>
    opts.queue.createTopic(aggregationEventTopic),
  );

  const aggregationAttachTopicLazy = createLazy(() => {
    return opts.queue.createTopic(aggregationAttachTopic);
  });

  return combineRunnable([
    createTopicSubscribeAndConsume(
      {
        queue: opts.queue,
        topic: aggregationAttachTopic,
        subscription: 'worker-aggregation-attach',
        signal: opts.signal,
        filter: {},
        autoRemove: false,
      },
      async (message, messageOpts) => {
        console.log(message);
        const stream = await opts.registry.getStream(message.data.stream);
        const aggregation = await opts.registry.getAggregation(
          message.data.aggregation,
        );

        if (!stream) {
          return;
        }

        if (!aggregation) {
          return;
        }

        const eventStore = createStreamEventStore({
          env: opts.env,
          id: message.data.stream,
          tenantStorage,
          events: mapEventDefinition(stream.events),
        });
        const stateDefinitions = mapStateDefinition(aggregation.state);
        const aggregationStorage = createInstanceStorage(
          message.data.aggregation,
          opts.storageBuilder,
          stateDefinitions,
          messageOpts.signal,
        );

        await opts.lock.acquire(
          createLockKey(message.data.aggregation),
          async () => {
            // await aggregationStorage.management
            //   .state('aggregationSources')
            //   .insert({
            //     stream: message.data.stream.key,
            //     aggregation: message.data.aggregation.key,
            //     realm: message.data.aggregation.realm,
            //   });

            const start = await firstOfRange(
              aggregationStorage.processorStorage
                .snapshot('operations')
                .index('eventAt')
                .range(null, { dir: 'prev' }),
            );

            console.log('check for events to attach');
            for await (const evt of eventStore.range(
              {
                realm: message.data.stream.realm,
                version: message.data.stream.version,
                pattern: message.data.stream.pattern,
                tenant: message.data.stream.tenant,
              },
              message.data.aggregation.auth,
              start?.data.appliedAt ?? null,
            )) {
              console.log('process operation ' + evt.id);
              const streamHost = aggregation.runtimes[evt.pattern];
              if (streamHost) {
                await applyRuntimeResultToInstance(
                  streamHost,
                  { storage: aggregationStorage.storage as any },
                  message.data.aggregation,
                  evt,
                  opts.env,
                  '', // TODO set value
                );
              } else {
                // TODO update bookmark
              }
            }
            console.log('ended for check');
          },
        );
      },
    ),
    createTopicSubscribeAndConsume(
      {
        queue: opts.queue,
        topic: aggregationEventTopic,
        subscription: 'worker-aggregation-event',
        signal: opts.signal,
        filter: {},
        autoRemove: false,
      },
      async (message, messageOpts) => {
        console.log('agg.event', message);
        const evt = message.data.event;

        for await (const aggregationDefinition of opts.registry.listAggregation(
          message.data.stream,
        )) {
          for await (const aggregation of opts.management.listAggregations(
            message.data.stream.tenant,
            aggregationDefinition.identifier,
            {
              signal: messageOpts.signal,
              auth: message.data.stream.auth,
            },
          )) {
            if (hasStreamSource(aggregation.events, message.data.stream)) {
              // TODO replace with aggregation source

              const stateDefinitions = mapStateDefinition(aggregation.state);
              const aggregationStorage = createInstanceStorage(
                aggregation.identifier,
                opts.storageBuilder,
                stateDefinitions,
                messageOpts.signal,
              );

              const streamHost = aggregationDefinition.runtimes[evt.pattern];
              if (streamHost) {
                await opts.lock.acquire(
                  createLockKey(aggregation.identifier),
                  async () => {
                    await applyRuntimeResultToInstance(
                      streamHost,
                      { storage: aggregationStorage.storage as any },
                      aggregation.identifier,
                      evt,
                      opts.env,
                      '', // TODO set value
                    );
                  },
                );
              } else {
                // TODO update bookmark
              }
            }
          }
        }
      },
    ),
    createTopicSubscribeAndConsume(
      {
        queue: opts.queue,
        topic: streamEventTopic,
        subscription: 'worker-stream-event',
        signal: opts.signal,
        filter: {},
        autoRemove: false,
      },
      async (message, consumeOpts) => {
        console.log('stream.attach', message);
        const evt = message.data.event;
        const stream = await opts.registry.getStream(message.data.identifier);

        if (!stream) {
          return;
        }

        const stateDefinitions = mapStateDefinition(stream.state);
        const streamStorage = createInstanceStorage(
          message.data.identifier,
          opts.storageBuilder,
          stateDefinitions,
          consumeOpts.signal,
        );

        await opts.lock.acquire(
          createLockKey(message.data.identifier),
          async () => {
            await applyRuntimeResultToInstance(
              stream.runtime,
              { storage: streamStorage.storage as any },
              message.data.identifier,
              evt,
              opts.env,
              '', // TODO set value
            );
          },
        );

        const commit = await commitTopicLazy.resolve();
        await commit.publish(
          {
            eventId: evt.id,
            identifier: message.data.identifier,
          },
          {
            attributes: {
              name: message.attributes.name,
              tenant: message.data.identifier.tenant,
              realm: message.data.identifier.realm,
              version: message.data.identifier.version,
              eventId: evt.id,
            },
          },
        );

        const aggEvent = await aggEventTopicLazy.resolve();
        await aggEvent.publish(
          {
            event: evt,
            stream: message.data.identifier,
          },
          {
            attributes: {
              tenant: message.data.identifier.tenant,
              realm: message.data.identifier.realm,
            },
          },
        );
      },
    ),
    createTopicSubscribeAndConsume(
      {
        queue: opts.queue,
        topic: streamInitTopic,
        subscription: 'worker-stream-init',
        signal: opts.signal,
        filter: {},
        autoRemove: false,
      },
      async (message, messageOpts) => {
        const aggAttachTopic = await aggregationAttachTopicLazy.resolve();

        for await (const aggregationDefinition of opts.registry.listAggregation(
          message.data.identifier,
        )) {
          for await (const aggregation of opts.management.listAggregations(
            message.data.identifier.tenant,
            aggregationDefinition.identifier,
            {
              signal: messageOpts.signal,
              auth: message.data.identifier.auth,
            },
          )) {
            if (hasStreamSource(aggregation.events, message.data.identifier)) {
              await aggAttachTopic.publish(
                {
                  stream: message.data.identifier,
                  aggregation: aggregation.identifier,
                },
                {
                  attributes: {
                    version: aggregation.identifier.version,
                    pattern: aggregation.identifier.pattern,
                    realm: aggregation.identifier.realm,
                    tenant: aggregation.identifier.tenant,
                  },
                },
              );
            }
          }
        }
      },
    ),
    createTopicSubscribeAndConsume(
      {
        queue: opts.queue,
        topic: aggregationInitTopic,
        subscription: 'worker-aggregation-init',
        signal: opts.signal,
        filter: {},
        autoRemove: false,
      },
      async (message) => {
        console.log('agg.init', message);
        const aggregation = await opts.registry.getAggregation(
          message.data.identifier,
        );
        if (!aggregation) {
          return;
        }

        const aggAttachTopic = await aggregationAttachTopicLazy.resolve();
        for (const evnt of Object.values(aggregation.events)) {
          for await (const stream of opts.management.listStreams(
            message.data.identifier.tenant,
            {
              ...evnt.stream,
              tenant: aggregation.identifier.tenant,
              realm: aggregation.identifier.realm, // TODO remove and move into source.stream
            },
            {
              auth: message.data.identifier.auth,
            },
          )) {
            if (hasStreamSource(aggregation.events, stream.identifier)) {
              await aggAttachTopic.publish(
                {
                  stream: stream.identifier,
                  aggregation: message.data.identifier,
                },
                {
                  attributes: {
                    version: message.data.identifier.version,
                    pattern: message.data.identifier.pattern,
                    realm: message.data.identifier.realm,
                    tenant: message.data.identifier.tenant,
                  },
                },
              );
            }
          }
        }
      },
    ),
  ]);
}

function hasStreamSource(
  events: Record<string, EventSource>,
  stream: InstanceIdentifier,
) {
  for (const e of Object.values(events)) {
    if (
      e.stream.pattern === stream.pattern &&
      e.stream.version === stream.version
    ) {
      return true;
    }
  }
  return false;
}
