import { ListRange } from '@angular/cdk/collections';
import {CdkVirtualForOf, CdkVirtualScrollViewport} from '@angular/cdk/scrolling';
import { DataSource } from '@angular/cdk/table';
import { MatTableDataSource } from '@angular/material/table';
import {BehaviorSubject, combineLatest, concat, from, Observable, of, Subject, Subscription} from 'rxjs';
import {
  delay,
  exhaustMap,
  filter,
  map, merge, share,
  shareReplay,
  startWith,
  switchMap, take, takeUntil,
  tap
} from 'rxjs/operators';
import {NgZone} from "@angular/core";
export class UserTableDataSource extends DataSource<any> {
  private _pageSize = 50; // elements
  private _pages = 100; // pages
  private _pageOffset = 5; // elements
  private _pageCache = new Set<number>();
  private _subscription: Subscription;
  private _viewPort: CdkVirtualScrollViewport;
  private _zone: NgZone;

  // Create MatTableDataSource so we can have all sort,filter bells and whistles
  matTableDataSource: MatTableDataSource<any> = new MatTableDataSource();
  updateCallback: (page) => Observable<any>;
  destroy$ = new Subject();
  scrollStartRange$ = new BehaviorSubject(0);

  // Expose dataStream to simulate VirtualForOf.dataStream
  dataStream = this.matTableDataSource.connect().asObservable();

  constructor(data) {
    super();
    this._pageSize = data.pageSize;
  }

  attach(viewPort: CdkVirtualScrollViewport) {
    if (!viewPort) {
      throw new Error('ViewPort is undefined');
    }
    this._viewPort = viewPort;

    this.initFetchingOnScrollUpdates();

    this._viewPort.detach();
    // Attach DataSource as CdkVirtualForOf so ViewPort can access dataStream
    this._viewPort.attach(this as any);
    this._viewPort.checkViewportSize();
    // Trigger range change so that 1st page can be loaded
    this._viewPort.setRenderedRange({ start: 0, end: 1 });
  }

  // Called by CDK Table
  connect(): Observable<any[]> {
    const tableData = this.matTableDataSource.connect();
    const filtered =
      this._viewPort === undefined
        ? tableData
        : this.filterByRangeStream(tableData);

    return filtered.pipe(shareReplay(1));
  }

  disconnect(): void {
    if (this._subscription) {
      this._subscription.unsubscribe();
    }
  }

  setScrollUpdateCallback(cb) {
    this.updateCallback = cb;
  }

  destroy() {
    this._viewPort.scrollToIndex(0);
    this.destroy$.next();
    if (this._subscription) {
      this._subscription.unsubscribe();
    }
  }

  getNextPage() {
    of(null).pipe(
      map(() => {
        const range = this._viewPort.getRenderedRange();
        return { start: range.start, end: range.end + (+this._pageSize) }
      }),
      switchMap(range => this._getPagesToDownload(range)),
      filter(page => !this._pageCache.has(page)),
      exhaustMap(page => this._simulateFetchAndUpdate(page)),
      takeUntil(this.destroy$)
    ).subscribe();
  }

  private initFetchingOnScrollUpdates() {
    this._subscription = this._viewPort.renderedRangeStream
      .pipe(
        tap(range => this.scrollStartRange$.next(range.start)),
        switchMap(range => this._getPagesToDownload(range)),
        filter(page => !this._pageCache.has(page)),
        exhaustMap(page => this._simulateFetchAndUpdate(page)),
        takeUntil(this.destroy$)
      )
      .subscribe();
  }

  private _getPagesToDownload({ start, end }: { start: number; end: number }) {
    const startPage = this._getPageForIndex(start);
    const endPage = this._getPageForIndex(end + this._pageOffset);
    const pages: number[] = [];
    for (let i = startPage; i <= endPage && i < this._pages; i++) {
      if (!this._pageCache.has(i)) {
        pages.push(i);
      }
    }
    return pages;
  }

  private _getPageForIndex(index: number): number {
    return Math.floor(index / this._pageSize);
  }

  private filterByRangeStream(tableData: Observable<any[]>) {
    const rangeStream = this._viewPort.renderedRangeStream.pipe(
      startWith({} as ListRange),
      takeUntil(this.destroy$)
    );
    const filtered = combineLatest(tableData, rangeStream).pipe(
      map(([data, { start, end }]) =>
        start === null || end === null ? data : data.slice(start, end)
      ),
      takeUntil(this.destroy$)
    );
    return filtered;
  }

  private _simulateFetchAndUpdate(page: number): Observable<any[]> {
    return of(page).pipe(
      filter(page => !this._pageCache.has(page)),
      switchMap(() => {
        return this.updateCallback(page);
      }),
      tap(() => this._pageCache.add(page)),
      tap(users => {
        const newData = [...this.matTableDataSource.data];
        newData.splice(page * this._pageSize, this._pageSize, ...users);
        this.matTableDataSource.data = newData;
      }),
      takeUntil(this.destroy$)
    );
  }
}
