import { StateDefinitions } from '../storage/state-declaration.js';
import { EventDeclarations } from '../event/event-declarations.js';
import { ManagementContext } from './management-context.js';
import { ManagementStreamData } from './management-stream-data.js';
import { StreamInterface } from './stream-interface.js';
import { createEventStore } from '../event/create-event-store.js';
import { createLazy } from './create-lazy.js';
import { StoredEvent } from '../event/stored-event.js';
import { generateId } from './generate-id.js';
import { SnapshotClient } from '../storage/snapshot-client.js';
import { createSnapshot } from './create-snapshot.js';
import { lazyWrap } from './lazy-wrap.js';

import { StreamPushOptions } from './stream-push-options.js';
import { ObservedClient } from '../storage/observer/observed-client.js';
import { observeClient } from '../storage/observer/observed-state.js';
import { nestedAbort } from '../queue/nested-abort.js';
import {
  stateCommitTopic,
  streamEventTopic,
  streamInitTopic,
} from './aggregation-attach-topic.js';
import { CustomError } from '../custom-error.js';
import { createInstanceStorage } from './create-instance-storage.js';
import { mapStateDeclarations } from './map-state-declarations.js';
import { createPermissionCache } from './create-permission-cache.js';
import { mapEventDeclarations } from './map-event-declarations.js';
import { MultiStorage } from '../storage/multi-storage.js';
import { TenantState } from './tenant-storage-factory.js';
import { RuntimeEnv } from '../runtime-env.js';
import { createLockKey } from '../lock/create-lock-key.js';

export function createStreamEventStore(opts: {
  tenantStorage: MultiStorage<TenantState>;
  env: RuntimeEnv;
  events: EventDeclarations;
}) {
  return createEventStore(opts.tenantStorage.storage('event'), {
    events: opts.events,
    env: opts.env,
  });
}

export function createStreamInterface<
  TState extends StateDefinitions,
  TEvent extends EventDeclarations,
>(
  management: ManagementContext,
  stream: ManagementStreamData<TState>,
  signal: AbortSignal,
): StreamInterface<TState, TEvent> {
  const lazyRegistration = createLazy(async () => {
    const id = await stream.identifier.resolve();

    const storage = createInstanceStorage(
      id,
      management.storageBuilder,
      stream.state,
      management.queue,
      signal,
    );

    const existing = await storage.management.snapshot('streams').get(
      {
        version: id.version,
        realm: id.realm,
        pattern: id.pattern,
        name: id.name,
        tenant: id.tenant,
      },
      { signal },
    );

    if (!existing) {
      await management.lock.acquire(
        createLockKey(id),
        async () => {
          const existing = await storage.management.snapshot('streams').get(
            {
              version: id.version,
              realm: id.realm,
              pattern: id.pattern,
              name: id.name,
              tenant: id.tenant,
            },
            { signal },
          );
          if (existing) {
            return;
          }

          console.log(
            `create new stream ${id.name}: realm: ${id.realm} pattern: ${id.pattern} version: ${id.version} args: ${id.args}`,
          );
          await storage.storage
            .storage('management')
            .transaction(async (trx) => {
              await trx.state('streams').insert({
                authId: id.auth.key,
                pattern: id.pattern,
                version: id.version,
                realm: id.realm,
                tenant: id.tenant,
                args: id.args,
                authSub: id.auth.sub,
                authAud: id.auth.aud,
                authIss: id.auth.iss,
                name: id.name,
                events: mapEventDeclarations(stream.events),
                state: mapStateDeclarations(stream.state),
              });

              for (const state of Object.keys(stream.state)) {
                await trx.state('permissions').insert({
                  pattern: id.pattern,
                  version: id.version,
                  realm: id.realm,
                  tenant: id.tenant,
                  name: id.name,
                  authId: id.auth.key,
                  type: `state:${state}`,
                  write: false,
                  read: true,
                });
              }

              for (const event of Object.keys(stream.events)) {
                await trx.state('permissions').insert({
                  pattern: id.pattern,
                  version: id.version,
                  realm: id.realm,
                  tenant: id.tenant,
                  name: id.name,
                  authId: id.auth.key,
                  type: `event:${event}`,
                  write: true,
                  read: true,
                });
              }
            }, signal);

          const queue = await management.queue.createTopic(streamInitTopic);
          await queue.publish(
            {
              identifier: id,
            },
            {
              attributes: {
                name: id.name,
                tenant: id.tenant,
                realm: id.realm,
                pattern: id.pattern,
                version: id.version,
              },
            },
          );
        },
        signal,
      );
    }

    const eventStore = createStreamEventStore({
      env: management.env,
      tenantStorage: management.tenantStorage,
      events: stream.events,
    });

    return {
      storage,
      identifier: id,
      eventStore,
      permissionCache: createPermissionCache(storage.management, id, signal),
    };
  });

  return {
    state: stream.state,
    async push<K extends string & keyof TEvent>(
      evt: K,
      data: TEvent[K]['type']['_output'],
      options: StreamPushOptions,
    ): Promise<StoredEvent> {
      const registration = await lazyRegistration.resolve();

      const eventPermission = await registration.permissionCache.resolve(
        registration.identifier.auth.key,
        evt,
      );

      if (!eventPermission.write) {
        throw new CustomError(`no permission`, null, {
          event: evt,
          authId: registration.identifier.auth.key,
        });
      }

      const event = await registration.eventStore.push(
        {
          pattern: registration.identifier.pattern,
          version: registration.identifier.version,
          realm: registration.identifier.realm,
          tenant: registration.identifier.tenant,
        },
        evt,
        data,
        {
          createdAt: options?.createdAt ?? new Date(),
          id: options?.id ?? generateId(),
          auth: registration.identifier.auth,
          annotations: options?.annotations ?? {},
          signal,
        },
      );

      const queue = await management.queue.createTopic(streamEventTopic);
      await queue.publish(
        {
          eventId: event.id,
          identifier: registration.identifier,
        },
        {
          attributes: {
            tenant: management.tenant,
            realm: registration.identifier.realm,
            name: registration.identifier.name,
            pattern: registration.identifier.pattern,
            version: registration.identifier.version,
          },
        },
      );

      if (options.await) {
        // TODO create sub with signal, and remove
        const sub = await management.queue.createSubscription(
          stateCommitTopic,
          `${management.tenant}:${registration.identifier.realm}:stream:${registration.identifier.name}:${event.id}`,
          {
            autoRemove: true,
            filter: {
              tenant: management.tenant,
              realm: registration.identifier.realm,
              version: registration.identifier.version,
              eventId: event.id,
              name: registration.identifier.name,
            },
          },
        );

        try {
          // TODO improve with async get
          await new Promise<void>((resolve, reject) => {
            try {
              const abort = nestedAbort(signal);
              sub
                .consume(
                  async () => {
                    resolve();
                    abort.abort();
                  },
                  { signal: abort.signal },
                )
                .ready.then((consumer) => consumer.done)
                .catch(reject);
            } catch (e) {
              reject(e);
            }
          });
        } finally {
          await sub.remove();
        }
      }

      return event;
    },
    async get(id: string): Promise<StoredEvent | null> {
      const registration = await lazyRegistration.resolve();

      const event = await registration.eventStore.get(
        {
          pattern: registration.identifier.pattern,
          version: registration.identifier.version,
          realm: registration.identifier.realm,
          tenant: registration.identifier.tenant,
        },
        id,
        { signal },
      );

      if (event === null) {
        return null;
      }

      const eventPermission = await registration.permissionCache.resolve(
        registration.identifier.auth.key,
        event?.event,
      );

      if (!eventPermission.read) {
        return null;
      }

      return event;
    },
    // async *iterate(date: Date | null, abort): AsyncGenerator<StoredEvent> {
    //   const registration = await lazyRegistration.resolve();
    //   for await (const evt of registration.eventStore.range(
    //     registration.identifier.key,
    //     {
    //       startAt: date,
    //       continuous: abort,
    //     },
    //   )) {
    //     yield evt;
    //   }
    // },
    observe<K extends string & keyof TState>(
      name: K,
    ): ObservedClient<TState[K]> {
      return observeClient(
        stream.state[name],
        lazyWrap(lazyRegistration, (r) => r.storage.stateStorage),
        name,
      );
    },
    snapshot<K extends string & keyof TState>(
      name: K,
    ): SnapshotClient<TState[K]> {
      return createSnapshot(
        lazyWrap(lazyRegistration, (r) => r.storage.stateStorage),
        stream.state,
        name,
      );
    },
  };
}
