import { OfflineManagement } from './offline-management.js';
import {
  StateDeclarations,
  StateDefinitions,
} from '@aion/core/storage/state-declaration.js';
import {
  AggregateHostRuntime,
  StreamHostRuntime,
} from '@aion/core/runtime/host-runtime.js';
import { StreamInstanceOptions } from '@aion/core/management/stream-instance-options.js';
import { AggregateInterface } from '@aion/core/management/aggregate-interface.js';
import { EventDeclarations } from '@aion/core/event/event-declarations.js';
import { StreamInterface } from '@aion/core/management/stream-interface.js';
import { ObservedClient } from '@aion/core/storage/observer/observed-client.js';
import { SnapshotClient } from '@aion/core/storage/snapshot-client.js';
import { StoredEvent } from '@aion/core/event/stored-event.js';
import { StreamPushOptions } from '@aion/core/management/stream-push-options.js';
import { declareState } from '@aion/core/storage/declare-state.js';
import { defineState } from '@aion/core/storage/define-state.js';
import { string } from '@aion/core/typing/string.js';
import { date } from '@aion/core/typing/date.js';
import { TrackedStorage } from '@aion/core/storage/tracker/tracked-state.js';
import { generateId } from '@aion/core/management/generate-id.js';
import { DefinitionIdentifier } from '@aion/core/realm/definition-identifier.js';
import { createLazy } from '@aion/core/management/create-lazy.js';
import { RuntimeEnv } from '@aion/core/runtime-env.js';
import { Lazy } from '@aion/core/management/lazy.js';
import { AuthParams } from '../auth/auth-params.js';
import { SessionInterface } from '../management/remote-management-options.js';
import { RealmDefinition } from '@aion/core/realm/realm-definition.js';
import { createMemoryQueue } from '@aion/core/queue/create-memory-queue.js';
import { createMemoryLock } from '@aion/core/lock/create-memory-lock.js';
import { createStaticRegistry } from '@aion/core/realm/create-static-registry.js';
import { createdStorageBuilder } from '@aion/core/storage/tracker/created-storage-builder.js';
import { createManagement } from '@aion/core/management/auth/create-management.js';
import { createWorker } from '@aion/core/management/create-worker.js';
import { createRemoteManagement } from '../management/create-remote-management.js';
import { StorageProvider } from '@aion/core/management/storage-provider.js';

export interface OfflineManagementOptions<Session extends SessionInterface> {
  url: string;
  tenant: string;
  env: RuntimeEnv;
  storageProvider: StorageProvider;

  signal: AbortSignal;
  session: (params: AuthParams) => Session;

  streams: StreamHostRuntime<any, any>[];
  aggregations: AggregateHostRuntime<any>[];
  realms: RealmDefinition<any>[];
}

const OfflineState = defineState({
  streams: declareState(
    {
      tenant: string(),
      realm: string(),
      pattern: string(),
      version: string(),
      lastEventId: string(),
    },
    ['tenant', 'realm', 'pattern', 'version'],
  ),
  localEvents: declareState(
    {
      id: string(),
      tenant: string(),
      realm: string(),
      pattern: string(),
      version: string(),
      createdAt: date(),
    },
    ['tenant', 'realm', 'pattern', 'version', 'id'],
  ).index('date', ['createdAt'] as const),
});

export function createOfflineManagement<Session extends SessionInterface>(
  opts: OfflineManagementOptions<Session>,
): OfflineManagement<Session> {
  const queue = createMemoryQueue();
  const lock = createMemoryLock();

  const session = opts.session({
    url: opts.url,
    tenant: opts.tenant,
    signal: opts.signal,
    env: opts.env,
  });
  const registry = createStaticRegistry({
    env: opts.env,
    tenant: opts.tenant,
    aggregations: opts.aggregations,
    streams: opts.streams,
    realms: opts.realms,
  });
  const storageBuilder = createdStorageBuilder(
    opts.storageProvider,
    queue,
    opts.env,
    opts.signal,
  );

  const localManagement = createManagement({
    storageBuilder,
    env: opts.env,
    lock,
    queue,
    registry,
    signal: opts.signal,
  });

  createWorker({
    env: opts.env,
    management: localManagement,
    registry,
    queue,
    signal: opts.signal,
    lock,
    name: 'worker-browser',
    storageBuilder,
  });

  const remoteManagement = createRemoteManagement({
    env: opts.env,
    tenant: opts.tenant,
    session: opts.session,
    signal: opts.signal,
    url: opts.url,
  });

  const offlineStorage = storageBuilder.open(
    'offline',
    OfflineState,
    opts.signal,
  );

  return {
    session,
    getAggregation<TState extends StateDeclarations>(
      aggregation: AggregateHostRuntime<TState>,
      options: StreamInstanceOptions,
    ): AggregateInterface<TState> {
      return localManagement.getAggregation(opts.tenant, aggregation, options);
    },
    getStream<
      TState extends StateDeclarations,
      TEvent extends EventDeclarations,
    >(
      stream: StreamHostRuntime<TState, TEvent>,
      options: StreamInstanceOptions,
    ): StreamInterface<TState, TEvent> {
      const localStream = localManagement.getStream(
        opts.tenant,
        stream,
        options,
      );
      const remoteStream = remoteManagement.getStream(stream, options);
      const identifier = createLazy(async () => ({
        realm: stream.source.realm,
        pattern: stream.source.pattern,
        version: await stream.source.version.resolve(opts.env),
        tenant: opts.tenant,
      }));
      return createOfflineStream(
        stream.source.state,
        offlineStorage,
        identifier,
        localStream,
        remoteStream,
      );
    },
  };
}

export interface OfflineStreamInterface<
  TState extends StateDefinitions,
  TEvent extends EventDeclarations,
> extends StreamInterface<TState, TEvent> {
  sync(): Promise<void>;
}

function createOfflineStream<
  TState extends StateDeclarations,
  TEvent extends EventDeclarations,
>(
  state: TState,
  offlineStorage: TrackedStorage<typeof OfflineState>,
  lazyIdentifier: Lazy<DefinitionIdentifier>,
  localStream: StreamInterface<TState, TEvent>,
  remoteStream: StreamInterface<TState, TEvent>,
): OfflineStreamInterface<TState, TEvent> {
  return {
    state,
    async sync(): Promise<void> {
      for await (const evt of offlineStorage
        .snapshot('localEvents')
        .index('date')
        .range(null)) {
        const storedEvent = await localStream.get(evt.data.id);
        if (!storedEvent) {
          continue;
        }

        await remoteStream.push(storedEvent.event, storedEvent.data, {
          createdAt: storedEvent.createdAt,
          annotations: storedEvent.annotations,
          id: storedEvent.id,
        });
        await offlineStorage.state('localEvents').delete(evt.data);
      }
    },
    async push<K extends string & keyof TEvent>(
      evt: K,
      data: TEvent[K]['_output'],
      options?: Partial<StreamPushOptions>,
    ): Promise<StoredEvent> {
      const id = options?.id ?? generateId();
      const createdAt = options?.createdAt ?? new Date();
      const identifier = await lazyIdentifier.resolve();
      await offlineStorage.state('localEvents').insert({
        id,
        createdAt,
        realm: identifier.realm,
        pattern: identifier.pattern,
        version: identifier.version,
        tenant: identifier.tenant,
      });
      return await localStream.push(evt, data, {
        ...options,
        createdAt,
        id,
      });
    },
    get(id: string): Promise<StoredEvent | null> {
      return localStream.get(id);
    },
    observe<K extends string & keyof TState>(
      name: K,
    ): ObservedClient<TState[K]> {
      return localStream.observe(name);
    },
    snapshot<K extends string & keyof TState>(
      name: K,
    ): SnapshotClient<TState[K]> {
      return localStream.snapshot(name);
    },
  };
}
