import {
  StateDefinition,
  StateKeyValues,
} from '@aion/core/storage/state-declaration.js';
import { ObservedClient } from '@aion/core/storage/observer/observed-client.js';
import { ObservedIndexClient } from '@aion/core/storage/observer/observed-index-client.js';
import { BrowseResult, RangeResult } from '@aion/core/storage/browse-result.js';
import { createRemoteObservedIndexClient } from './create-remote-observed-index-client.js';
import { ObserveRangeOptions } from '@aion/core/storage/observer/observe-range-options.js';
import { IterateOptions } from '@aion/core/storage/observer/iterate-options.js';
import { RemoteDescription } from './remote-description.js';
import { RpcClient } from '../api/create-rpc-client.js';
import { SessionManager } from '../auth/jwt-session.js';
import { Lazy } from '@aion/core/management/lazy.js';
import { mapToState } from '@aion/core/runtime/get-state-data-keys.js';

export function createRemoteObservedClient<
  TState extends StateDefinition<any, any, any>,
>(
  state: TState,
  rpcClient: RpcClient,
  session: SessionManager,
  lazyVersion: Lazy<string>,
  type: 'stream' | 'aggregation',
  streamIdentifier: RemoteDescription,
  stateName: string,
): ObservedClient<TState> {
  return {
    async *observe(
      id: StateKeyValues<TState>,
      options,
    ): AsyncGenerator<any | null> {
      const response = await rpcClient['state.get.observe'].call(
        {
          type,
          tenant: streamIdentifier.tenant,
          realm: streamIdentifier.realm,
          version: await lazyVersion.resolve(),
          pattern: streamIdentifier.pattern,
          state: stateName,
          id,
          args: streamIdentifier.args,
          token: await session.resolveToken(),
        },
        options.signal,
      );
      for await (const item of response.result()) {
        yield mapToState(state, item);
      }
    },
    async *iterate(
      bookmark: StateKeyValues<TState> | null,
      iterateOptions: IterateOptions,
    ): AsyncGenerator<RangeResult<any>> {
      const response = await rpcClient['state.iterate'].call(
        {
          tenant: streamIdentifier.tenant,
          realm: streamIdentifier.realm,
          version: await lazyVersion.resolve(),
          pattern: streamIdentifier.pattern,
          state: stateName,
          bookmark: bookmark ?? undefined,
          direction: iterateOptions.dir,
          open: iterateOptions.open,
          token: await session.resolveToken(),
          args: streamIdentifier.args,
          type,
          take: undefined,
        },
        iterateOptions.signal,
      );

      for await (const item of response.result()) {
        yield {
          data: mapToState(state, item.data),
          keys: item.keys,
        };
      }
    },
    index<K extends keyof TState['indices'] & string>(
      index: K,
    ): ObservedIndexClient<TState, TState['indices'][K]> {
      return createRemoteObservedIndexClient(
        state,
        state.indices[index],
        0,
        rpcClient,
        session,
        lazyVersion,
        type,
        streamIdentifier,
        stateName,
        index,
        {},
      );
    },
    async *observeRange(
      bookmark: StateKeyValues<TState> | null,
      options: ObserveRangeOptions,
    ): AsyncGenerator<BrowseResult<any>> {
      const response = await rpcClient['state.browse.observe'].call(
        {
          tenant: streamIdentifier.tenant,
          realm: streamIdentifier.realm,
          version: await lazyVersion.resolve(),
          pattern: streamIdentifier.pattern,
          state: stateName,
          args: streamIdentifier.args,
          bookmark: bookmark ?? undefined,
          take: options?.take,
          direction: options?.dir,
          token: await session.resolveToken(),
          type,
          open: options.open,
        },
        options.signal,
      );
      for await (const item of response.result()) {
        yield {
          bookmark: item.bookmark,
          items: item.items.map((i) => ({
            data: mapToState(state, i.data),
            keys: i.keys,
          })),
        };
      }
    },
  };
}
