Skip to content

13 - RxJS与异步编程

📖 学习目标

通过本章节学习,您将掌握:

  • RxJS核心概念和原理
  • 高级操作符的使用
  • Subject和BehaviorSubject
  • 调度器(Schedulers)
  • 错误处理策略
  • 内存泄漏预防
  • 性能优化技巧

🎯 核心概念

1. 什么是RxJS?

RxJS (Reactive Extensions for JavaScript) 是一个用于处理异步和基于事件的程序的库,它使用Observable序列来编写异步和基于事件的程序。

2. 核心概念

Observable (可观察对象) - 数据流
├── Observer (观察者) - 订阅者
├── Subscription (订阅) - 连接
├── Operators (操作符) - 转换
└── Subject (主体) - 多播

3. 响应式编程模式

数据源 → 操作符链 → 订阅者
   ↓         ↓         ↓
Observable → Operators → Observer

🔧 Observable基础

1. 创建Observable

typescript
import { Observable, of, from, interval, timer } from 'rxjs';

// 创建简单值
const simple$ = of(1, 2, 3, 4, 5);

// 从数组创建
const array$ = from([1, 2, 3, 4, 5]);

// 从Promise创建
const promise$ = from(fetch('/api/data'));

// 定时器
const interval$ = interval(1000); // 每秒发射
const timer$ = timer(2000, 1000); // 2秒后开始,每秒发射

// 自定义Observable
const custom$ = new Observable(observer => {
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});

2. 订阅Observable

typescript
// 基本订阅
const subscription = observable.subscribe({
  next: value => console.log('Next:', value),
  error: error => console.error('Error:', error),
  complete: () => console.log('Complete')
});

// 简化订阅
observable.subscribe(
  value => console.log('Next:', value),
  error => console.error('Error:', error),
  () => console.log('Complete')
);

// 只处理next
observable.subscribe(value => console.log('Next:', value));

🎯 高级操作符

1. 转换操作符

typescript
import { map, switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';

// map - 转换每个值
const doubled$ = source$.pipe(
  map(x => x * 2)
);

// switchMap - 切换到新的Observable
const userPosts$ = userId$.pipe(
  switchMap(id => this.userService.getUserPosts(id))
);

// mergeMap - 合并多个Observable
const searchResults$ = searchQuery$.pipe(
  mergeMap(query => this.searchService.search(query))
);

// concatMap - 按顺序执行
const uploadFiles$ = fileList$.pipe(
  concatMap(file => this.uploadService.upload(file))
);

// exhaustMap - 忽略新的请求直到当前完成
const saveData$ = formData$.pipe(
  exhaustMap(data => this.dataService.save(data))
);

2. 过滤操作符

typescript
import { filter, take, takeUntil, takeWhile, skip, distinct } from 'rxjs/operators';

// filter - 过滤值
const evenNumbers$ = numbers$.pipe(
  filter(x => x % 2 === 0)
);

// take - 取前N个值
const firstFive$ = source$.pipe(
  take(5)
);

// takeUntil - 直到某个条件
const data$ = source$.pipe(
  takeUntil(this.destroy$)
);

// takeWhile - 当条件为真时取值
const ascending$ = numbers$.pipe(
  takeWhile(x => x < 10)
);

// skip - 跳过前N个值
const afterFirst$ = source$.pipe(
  skip(1)
);

// distinct - 去重
const unique$ = source$.pipe(
  distinct()
);

3. 组合操作符

typescript
import { combineLatest, merge, zip, withLatestFrom } from 'rxjs';
import { startWith, debounceTime, throttleTime } from 'rxjs/operators';

// combineLatest - 组合最新的值
const combined$ = combineLatest([
  user$,
  posts$,
  comments$
]).pipe(
  map(([user, posts, comments]) => ({
    user,
    posts,
    comments
  }))
);

// merge - 合并多个Observable
const merged$ = merge(
  click$,
  keyup$,
  scroll$
);

// zip - 按索引组合
const zipped$ = zip(
  names$,
  ages$,
  cities$
).pipe(
  map(([name, age, city]) => ({ name, age, city }))
);

// withLatestFrom - 与最新的值组合
const searchResults$ = searchQuery$.pipe(
  withLatestFrom(filters$),
  switchMap(([query, filters]) => 
    this.searchService.search(query, filters)
  )
);

// debounceTime - 防抖
const debouncedSearch$ = searchInput$.pipe(
  debounceTime(300),
  distinctUntilChanged()
);

// throttleTime - 节流
const throttledScroll$ = scroll$.pipe(
  throttleTime(100)
);

🔄 Subject和BehaviorSubject

1. Subject

typescript
import { Subject } from 'rxjs';

// 创建Subject
const subject = new Subject<string>();

// 订阅
const subscription1 = subject.subscribe(value => 
  console.log('Observer 1:', value)
);

const subscription2 = subject.subscribe(value => 
  console.log('Observer 2:', value)
);

// 发射值
subject.next('Hello');
subject.next('World');

// 完成
subject.complete();

// 错误
subject.error(new Error('Something went wrong'));

2. BehaviorSubject

typescript
import { BehaviorSubject } from 'rxjs';

// 创建BehaviorSubject(需要初始值)
const behaviorSubject = new BehaviorSubject<string>('Initial Value');

// 订阅(会立即收到当前值)
const subscription = behaviorSubject.subscribe(value => 
  console.log('Current value:', value)
);

// 获取当前值
console.log('Current value:', behaviorSubject.value);

// 发射新值
behaviorSubject.next('New Value');

3. 实际应用示例

typescript
// 用户认证状态管理
@Injectable({
  providedIn: 'root'
})
export class AuthService {
  private userSubject = new BehaviorSubject<User | null>(null);
  public user$ = this.userSubject.asObservable();

  constructor() {
    // 从localStorage恢复用户状态
    const savedUser = localStorage.getItem('user');
    if (savedUser) {
      this.userSubject.next(JSON.parse(savedUser));
    }
  }

  login(user: User) {
    this.userSubject.next(user);
    localStorage.setItem('user', JSON.stringify(user));
  }

  logout() {
    this.userSubject.next(null);
    localStorage.removeItem('user');
  }

  getCurrentUser(): User | null {
    return this.userSubject.value;
  }
}

// 使用服务
@Component({
  selector: 'app-header',
  template: `
    <div *ngIf="user$ | async as user">
      欢迎,{{ user.name }}!
      <button (click)="logout()">退出</button>
    </div>
    <div *ngIf="!(user$ | async)">
      <a routerLink="/login">登录</a>
    </div>
  `
})
export class HeaderComponent {
  user$ = this.authService.user$;

  constructor(private authService: AuthService) {}

  logout() {
    this.authService.logout();
  }
}

⚡ 调度器(Schedulers)

1. 调度器类型

typescript
import { 
  asyncScheduler, 
  queueScheduler, 
  asapScheduler, 
  animationFrameScheduler 
} from 'rxjs';

// asyncScheduler - 异步执行
const async$ = of(1, 2, 3).pipe(
  observeOn(asyncScheduler)
);

// queueScheduler - 同步执行
const queue$ = of(1, 2, 3).pipe(
  observeOn(queueScheduler)
);

// asapScheduler - 尽快异步执行
const asap$ = of(1, 2, 3).pipe(
  observeOn(asapScheduler)
);

// animationFrameScheduler - 在下一帧执行
const animation$ = of(1, 2, 3).pipe(
  observeOn(animationFrameScheduler)
);

2. 实际应用

typescript
// 防抖搜索
const searchResults$ = searchInput$.pipe(
  debounceTime(300, asyncScheduler),
  distinctUntilChanged(),
  switchMap(query => this.searchService.search(query))
);

// 动画优化
const animation$ = scroll$.pipe(
  throttleTime(16, animationFrameScheduler), // 60fps
  map(event => event.target.scrollTop)
);

🛡️ 错误处理

1. 基本错误处理

typescript
import { catchError, retry, retryWhen, delay } from 'rxjs/operators';
import { throwError, timer } from 'rxjs';

// catchError - 捕获错误
const safeData$ = data$.pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]); // 返回默认值
  })
);

// retry - 重试
const retryData$ = data$.pipe(
  retry(3) // 重试3次
);

// retryWhen - 条件重试
const retryWhenData$ = data$.pipe(
  retryWhen(errors => 
    errors.pipe(
      delay(1000),
      take(3)
    )
  )
);

// 自定义错误处理
const customError$ = data$.pipe(
  catchError(error => {
    if (error.status === 404) {
      return of(null);
    } else if (error.status === 500) {
      return throwError('服务器错误');
    } else {
      return throwError('未知错误');
    }
  })
);

2. 全局错误处理

typescript
// 全局错误处理服务
@Injectable({
  providedIn: 'root'
})
export class ErrorHandlerService {
  private errorSubject = new Subject<Error>();
  public error$ = this.errorSubject.asObservable();

  handleError(error: any) {
    console.error('Global error:', error);
    this.errorSubject.next(error);
    
    // 发送到错误监控服务
    this.sendToMonitoring(error);
  }

  private sendToMonitoring(error: any) {
    // 发送到错误监控服务
  }
}

// HTTP拦截器
@Injectable()
export class ErrorInterceptor implements HttpInterceptor {
  constructor(private errorHandler: ErrorHandlerService) {}

  intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    return next.handle(req).pipe(
      catchError(error => {
        this.errorHandler.handleError(error);
        return throwError(error);
      })
    );
  }
}

🧠 内存泄漏预防

1. 订阅管理

typescript
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, Subscription } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

@Component({
  selector: 'app-data-component',
  template: `<div>{{ data }}</div>`
})
export class DataComponent implements OnInit, OnDestroy {
  private destroy$ = new Subject<void>();
  data: any;

  ngOnInit() {
    // 使用takeUntil自动取消订阅
    this.dataService.getData()
      .pipe(takeUntil(this.destroy$))
      .subscribe(data => {
        this.data = data;
      });

    // 多个订阅
    this.userService.getUser()
      .pipe(takeUntil(this.destroy$))
      .subscribe(user => {
        // 处理用户数据
      });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

2. 手动订阅管理

typescript
@Component({
  selector: 'app-manual-component',
  template: `<div>{{ data }}</div>`
})
export class ManualComponent implements OnInit, OnDestroy {
  private subscriptions: Subscription[] = [];
  data: any;

  ngOnInit() {
    // 收集订阅
    this.subscriptions.push(
      this.dataService.getData().subscribe(data => {
        this.data = data;
      })
    );

    this.subscriptions.push(
      this.userService.getUser().subscribe(user => {
        // 处理用户数据
      })
    );
  }

  ngOnDestroy() {
    // 取消所有订阅
    this.subscriptions.forEach(sub => sub.unsubscribe());
  }
}

🎮 实践练习

练习1:实现搜索功能

创建一个带有防抖、去重、错误处理的搜索功能:

  • 用户输入搜索关键词
  • 300ms防抖
  • 去重相同查询
  • 错误处理和重试
  • 加载状态管理

练习2:实现实时数据更新

创建一个实时数据更新系统:

  • 使用WebSocket连接
  • 自动重连机制
  • 错误处理和恢复
  • 数据缓存和同步

📚 详细示例

完整的搜索组件

typescript
// search.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { FormControl } from '@angular/forms';
import { Subject, BehaviorSubject, combineLatest } from 'rxjs';
import { 
  debounceTime, 
  distinctUntilChanged, 
  switchMap, 
  catchError, 
  startWith,
  takeUntil,
  retry
} from 'rxjs/operators';
import { SearchService } from './search.service';

export interface SearchResult {
  id: number;
  title: string;
  description: string;
  type: string;
}

export interface SearchState {
  results: SearchResult[];
  loading: boolean;
  error: string | null;
  query: string;
}

@Component({
  selector: 'app-search',
  template: `
    <div class="search-container">
      <input 
        [formControl]="searchControl"
        placeholder="搜索..."
        class="search-input">
      
      <div *ngIf="searchState$ | async as state" class="search-results">
        <div *ngIf="state.loading" class="loading">
          搜索中...
        </div>
        
        <div *ngIf="state.error" class="error">
          错误: {{ state.error }}
          <button (click)="retrySearch()">重试</button>
        </div>
        
        <div *ngIf="!state.loading && !state.error" class="results">
          <div *ngFor="let result of state.results" class="result-item">
            <h3>{{ result.title }}</h3>
            <p>{{ result.description }}</p>
            <span class="type">{{ result.type }}</span>
          </div>
          
          <div *ngIf="state.results.length === 0 && state.query" class="no-results">
            没有找到相关结果
          </div>
        </div>
      </div>
    </div>
  `,
  styleUrls: ['./search.component.css']
})
export class SearchComponent implements OnInit, OnDestroy {
  searchControl = new FormControl('');
  private destroy$ = new Subject<void>();
  private retrySubject = new Subject<void>();
  
  searchState$ = new BehaviorSubject<SearchState>({
    results: [],
    loading: false,
    error: null,
    query: ''
  });

  constructor(private searchService: SearchService) {}

  ngOnInit() {
    // 搜索流
    const searchQuery$ = this.searchControl.valueChanges.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      startWith(''),
      takeUntil(this.destroy$)
    );

    // 重试流
    const retry$ = this.retrySubject.pipe(
      switchMap(() => this.searchControl.value),
      takeUntil(this.destroy$)
    );

    // 合并搜索和重试流
    const searchTrigger$ = combineLatest([searchQuery$, retry$]).pipe(
      map(([query]) => query),
      distinctUntilChanged()
    );

    // 执行搜索
    searchTrigger$.pipe(
      switchMap(query => {
        if (!query || query.trim() === '') {
          return of({
            results: [],
            loading: false,
            error: null,
            query: ''
          });
        }

        this.updateState({ loading: true, error: null, query });

        return this.searchService.search(query).pipe(
          map(results => ({
            results,
            loading: false,
            error: null,
            query
          })),
          catchError(error => {
            console.error('Search error:', error);
            return of({
              results: [],
              loading: false,
              error: '搜索失败,请重试',
              query
            });
          }),
          retry(2) // 重试2次
        );
      }),
      takeUntil(this.destroy$)
    ).subscribe(state => {
      this.searchState$.next(state);
    });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }

  retrySearch() {
    this.retrySubject.next();
  }

  private updateState(partialState: Partial<SearchState>) {
    const currentState = this.searchState$.value;
    this.searchState$.next({ ...currentState, ...partialState });
  }
}

实时数据服务

typescript
// realtime-data.service.ts
import { Injectable } from '@angular/core';
import { Observable, Subject, BehaviorSubject, timer } from 'rxjs';
import { 
  switchMap, 
  catchError, 
  retry, 
  takeUntil,
  shareReplay
} from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class RealtimeDataService {
  private connectionSubject = new BehaviorSubject<boolean>(false);
  private dataSubject = new Subject<any>();
  private destroy$ = new Subject<void>();

  public connection$ = this.connectionSubject.asObservable();
  public data$ = this.dataSubject.asObservable();

  constructor() {
    this.initializeConnection();
  }

  private initializeConnection() {
    // 模拟WebSocket连接
    timer(0, 5000).pipe(
      switchMap(() => this.fetchData()),
      catchError(error => {
        console.error('Connection error:', error);
        this.connectionSubject.next(false);
        return this.reconnect();
      }),
      retry(3),
      takeUntil(this.destroy$)
    ).subscribe(data => {
      this.dataSubject.next(data);
      this.connectionSubject.next(true);
    });
  }

  private fetchData(): Observable<any> {
    // 模拟API调用
    return new Observable(observer => {
      setTimeout(() => {
        observer.next({
          timestamp: new Date(),
          value: Math.random() * 100
        });
        observer.complete();
      }, 1000);
    });
  }

  private reconnect(): Observable<any> {
    return timer(5000).pipe(
      switchMap(() => this.fetchData())
    );
  }

  disconnect() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

🔧 性能优化技巧

1. 使用shareReplay

typescript
// 共享Observable结果
const sharedData$ = this.dataService.getData().pipe(
  shareReplay(1) // 缓存最后一个值
);

// 多个订阅者共享同一个请求
const subscription1 = sharedData$.subscribe(data => console.log('Sub1:', data));
const subscription2 = sharedData$.subscribe(data => console.log('Sub2:', data));

2. 使用distinctUntilChanged

typescript
// 只在值真正改变时发射
const distinctData$ = data$.pipe(
  distinctUntilChanged()
);

3. 使用auditTime

typescript
// 限制发射频率
const throttledData$ = data$.pipe(
  auditTime(100) // 每100ms最多发射一次
);

✅ 学习检查

完成本章节后,请确认您能够:

  • [ ] 理解RxJS核心概念
  • [ ] 使用高级操作符
  • [ ] 使用Subject和BehaviorSubject
  • [ ] 理解调度器的作用
  • [ ] 实现错误处理策略
  • [ ] 预防内存泄漏
  • [ ] 优化性能

🚀 下一步

完成本章节学习后,请继续学习14-测试