import { StateDeclarations } from '../../storage/state-declaration.js';
import {
  AggregateHostRuntime,
  StreamHostRuntime,
} from '../../runtime/host-runtime.js';
import { EventDeclarations } from '../../event/event-declarations.js';
import { ManagementOptions } from '../management-options.js';
import { StreamInterface } from '../stream-interface.js';
import { AggregateInterface } from '../aggregate-interface.js';
import {
  createLazyAuthIdentifier,
  createLazyInstanceIdentifier,
} from '../create-lazy-instance-identifier.js';
import { createStreamInterface } from '../create-stream-interface.js';
import { createAggregateInterface } from '../create-aggregate-interface.js';
import { StreamInstanceOptions } from '../stream-instance-options.js';
import { Management } from '../management.js';
import { lazyWrap } from '../lazy-wrap.js';
import { StreamDescription } from '../stream-description.js';
import { AggregationDescription } from '../aggregation-description.js';
import { createManagementContext } from '../create-management-context.js';
import { mapAggregationEventSource } from '../map-aggregation-event-source.js';
import { createLazy } from '../create-lazy.js';
import {
  AggregationRegistration,
  RealmRegistry,
  StreamRegistration,
} from '../../realm/create-realm-registry.js';
import { mapStateDefinition } from '../map-state-declarations.js';
import { mapEventDefinition } from '../map-event-declarations.js';
import { resolvedLazy } from '../resolved-lazy.js';
import { DefinitionIdentifier } from '../../realm/definition-identifier.js';
import { createDefinitionStorage } from '../create-instance-storage.js';
import { createTenantStorage } from '../create-tenant-storage.js';
import { ListStreamsOptions } from '../list-streams-options.js';
import { PartitionBuilder } from '../create-partition-storage.js';

async function getStreamStorage(
  opts: {
    registry: RealmRegistry;
    storageBuilder: PartitionBuilder;
    signal: AbortSignal;
  },
  identifier: DefinitionIdentifier,
) {
  const stream = await opts.registry.getStream(identifier, opts.signal);
  if (!stream) {
    return;
  }
  return createDefinitionStorage(
    stream.identifier,
    opts.storageBuilder,
    mapStateDefinition(stream.state),
    opts.signal,
  );
}

async function getAggregationStorage(
  opts: {
    registry: RealmRegistry;
    storageBuilder: PartitionBuilder;
    signal: AbortSignal;
  },
  identifier: DefinitionIdentifier,
) {
  const stream = await opts.registry.getAggregation(identifier, opts.signal);
  if (!stream) {
    return;
  }
  return createDefinitionStorage(
    stream.identifier,
    opts.storageBuilder,
    mapStateDefinition(stream.state),
    opts.signal,
  );
}

export function createManagement(opts: ManagementOptions): Management {
  return {
    async *listStreams(
      tenant: string,
      identifier: DefinitionIdentifier,
      options: ListStreamsOptions,
    ): AsyncGenerator<StreamDescription> {
      const authIdentifier = createLazyAuthIdentifier({
        auth: options?.auth,
        env: opts.env,
      });
      const storage = await getStreamStorage(opts, identifier);
      if (!storage) {
        return;
      }
      const cursor = storage
        .storage('management')
        .snapshot('streams')
        .index('authId')
        .filter('tenant', tenant)
        .filter('realm', identifier.realm)
        .filter('pattern', identifier.pattern)
        .filter('version', identifier.version)
        .filter(
          'authId',
          await lazyWrap(authIdentifier, (a) => a.key).resolve(),
        )
        .range(null, { signal: options.signal });
      for await (const i of cursor) {
        yield {
          identifier: {
            tenant: tenant,
            version: i.data.version,
            pattern: i.data.pattern,
            realm: i.data.realm,
            args: i.data.args,
            type: 'stream',
            auth: {
              key: i.data.authId,
              aud: i.data.authAud,
              sub: i.data.authSub,
              iss: i.data.authIss,
            },
            name: i.data.name,
          },
          events: i.data.events,
          state: i.data.state,
        };
      }
    },
    async *listAggregations(
      tenant: string,
      identifier: DefinitionIdentifier,
      options: ListStreamsOptions,
    ): AsyncGenerator<AggregationDescription> {
      const authIdentifier = createLazyAuthIdentifier({
        auth: options?.auth,
        env: opts.env,
      });
      const storage = await getAggregationStorage(opts, identifier);
      if (!storage) {
        return;
      }
      const cursor = storage
        .storage('management')
        .snapshot('aggregations')
        .index('authId')
        .filter('tenant', tenant)
        .filter('realm', identifier.realm)
        .filter('pattern', identifier.pattern)
        .filter('version', identifier.version)
        .filter(
          'authId',
          await lazyWrap(authIdentifier, (a) => a.key).resolve(),
        )
        .range(null, { signal: options.signal });
      for await (const i of cursor) {
        yield {
          identifier: {
            tenant: tenant,
            version: i.data.version,
            pattern: i.data.pattern,
            realm: i.data.realm,
            args: i.data.args,
            type: 'aggregation',
            auth: {
              key: i.data.authId,
              aud: i.data.authAud,
              sub: i.data.authSub,
              iss: i.data.authIss,
            },
            name: i.data.name,
          },
          events: i.data.events,
          state: i.data.state,
        };
      }
    },
    getStreamFromRegistration(
      tenant: string,
      stream: StreamRegistration,
      options: StreamInstanceOptions,
    ): StreamInterface<any, any> {
      const identifier = createLazyInstanceIdentifier({
        type: 'stream',
        identifier: resolvedLazy(stream.identifier),
        auth: options?.auth,
        args: options?.args,
        env: opts.env,
      });

      const tenantStorage = createTenantStorage(
        opts.storageBuilder,
        tenant,
        options.signal,
      );
      const ctx = createManagementContext(opts, tenantStorage, tenant);
      return createStreamInterface(
        ctx,
        {
          state: mapStateDefinition(stream.state),
          events: mapEventDefinition(stream.events),
          identifier,
        },
        options.signal,
      );
    },
    getAggregationFromRegistration(
      tenant: string,
      stream: AggregationRegistration,
      options: StreamInstanceOptions,
    ): AggregateInterface<any> {
      const identifier = createLazyInstanceIdentifier({
        type: 'aggregation',
        identifier: resolvedLazy(stream.identifier),
        auth: options?.auth,
        args: options?.args,
        env: opts.env,
      });

      const tenantStorage = createTenantStorage(
        opts.storageBuilder,
        tenant,
        options.signal,
      );
      const ctx = createManagementContext(opts, tenantStorage, tenant);
      return createAggregateInterface(
        ctx,
        {
          state: mapStateDefinition(stream.state),
          events: resolvedLazy(stream.events),
          identifier,
        },
        options.signal,
      );
    },
    getStream<
      TState extends StateDeclarations,
      TEvent extends EventDeclarations,
    >(
      tenant: string,
      stream: StreamHostRuntime<TState, TEvent>,
      options: StreamInstanceOptions,
    ): StreamInterface<TState, TEvent> {
      const identifier = createLazyInstanceIdentifier({
        type: 'stream',
        env: opts.env,
        args: options?.args,
        auth: options?.auth,
        identifier: createLazy(async () => ({
          tenant,
          realm: stream.source.realm,
          pattern: stream.source.pattern,
          version: await stream.source.version.resolve(opts.env),
        })),
      });
      const tenantStorage = createTenantStorage(
        opts.storageBuilder,
        tenant,
        options.signal,
      );
      const ctx = createManagementContext(opts, tenantStorage, tenant);

      return createStreamInterface(
        ctx,
        {
          state: stream.source.state,
          events: stream.source.events,
          identifier,
        },
        options.signal,
      );
    },
    getAggregation<TState extends StateDeclarations>(
      tenant: string,
      aggregation: AggregateHostRuntime<TState>,
      options: StreamInstanceOptions,
    ): AggregateInterface<TState> {
      const identifier = createLazyInstanceIdentifier({
        type: 'aggregation',
        env: opts.env,
        args: options?.args,
        auth: options?.auth,
        identifier: createLazy(async () => ({
          tenant,
          realm: aggregation.source.realm,
          pattern: aggregation.source.pattern,
          version: await aggregation.source.version.resolve(opts.env),
        })),
      });
      const tenantStorage = createTenantStorage(
        opts.storageBuilder,
        tenant,
        options.signal,
      );
      const ctx = createManagementContext(opts, tenantStorage, tenant);

      return createAggregateInterface(
        ctx,
        {
          identifier,
          state: aggregation.source.state,
          events: createLazy(async () =>
            mapAggregationEventSource(aggregation.source.events, opts.env),
          ),
        },
        options.signal,
      );
    },
  };
}
