반응형
반응형
반응형

Java Stream

 

Java Stream 특징

- Java 8부터 도입된 기능으로, 컬렉션, 배열, 파일 등의 데이터 요소를 처리하는 연산을 지원하는 함수형 Stream 입니다.

- 데이터 소스로부터 연속된 데이터 흐름을 제공하여 데이터를 처리합니다.

- 데이터를 효율적으로 처리할 수 있으며, 병렬 처리를 포함한 다양한 연산을 적용할 수 있습니다. 

- 데이터를 필터링, 매핑, 정렬, 그룹핑 등 다양한 작업을 수행할 수 있으며, 함수형 프로그래밍 스타일을 지원하여 코드를 간결하고 가독성 있게 작성할 수 있습니다.

- 중간 연산과 최종 연산으로 구성된 파이프라인을 만들 수 있습니다. 중간 연산은 다른 Stream 을 반환하며, 최종 연산은 최종 결과를 반환합니다.

 

Java Stream 처리 단계

1. 데이터 소스 생성

 - 컬렉션, 배열, 파일 등의 데이터 소스로부터 Stream 을 생성합니다.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> stream = numbers.stream();

 

2. 중간 연산

 - 필터링, 매핑, 정렬 등의 작업을 수행할 수 있습니다.

stream = stream.filter(n -> n % 2 == 0) // 짝수만 필터링
               .map(n -> n * 2)         // 각 요소를 2배로 매핑
               .sorted();               // 정렬

 

3. 최종 연산

 - 리스트, 배열, 요소 개수 등 다양한 결과를 얻을 수 있습니다.

// 요소 개수 세기(count)
long count = numbers.stream()
                   .count();
System.out.println(count);  // 출력: 5

// 요소들의 합 구하기(sum)
int sum = numbers.stream()
                .mapToInt(Integer::intValue)
                .sum();
System.out.println(sum);  // 출력: 15


// 최대값 찾기(max)
Optional<Integer> max = numbers.stream()
                               .max(Comparator.naturalOrder());
System.out.println(max.orElse(0));  // 출력: 5


// 요소들의 평균 구하기(average)
OptionalDouble average = numbers.stream()
                               .mapToInt(Integer::intValue)
                               .average();
System.out.println(average.orElse(0));  // 출력: 3.0


// 요소들을 리스트로 변환(collect)
List<Integer> doubledNumbers = numbers.stream()
                                      .map(n -> n * 2)
                                      .collect(Collectors.toList());
System.out.println(doubledNumbers);  // 출력: [2, 4, 6, 8, 10]

※ 이외에도 최소값 찾기(min), 문자열로 연결하기(join), 그룹화하기(groupingBy) 등 다양한 최종 연산이 있습니다.

 

Java Stream VS Parallel Stream

구분 Stream Parallel Stream
데이터 처리 방식 순차적으로 연산을 수행 병렬로 연산을 수행
처리 속도 단일 CPU 코어에서 동작하며, 작업을 직렬로 처리.  멀티코어 시스템에서 데이터 처리 속도를 높일 수 있음.
작업을 분할하고 스레드 간의 동기화 오버헤드가 발생할 수 있으므로, 작업량이 많거나 데이터가 충분히 큰 경우에 이점.
스레드 안전성 - 멀티 스레드에서 작업을 수행하므로 스레드 안전성에 주의.
thread-safe 한 자료구조를 사용하거나 동기화 메커니즘을 적절하게 사용
순서 보장 연산 순서가 보장되며, 요소 순서 유지 병렬로 처리하기 때문에 요소의 순서가 보장되지 않을 수 있음. 
사용처 데이터의 순서가 중요하거나 상호작용이 필요한 경우 유용 멀티코어 CPU 시스템에서 병렬 처리에 특히 유용하며, 대량의 데이터를 동시에 처리해야 할 때 성능 향상을 제공

 

소스로 비교

public static void main(String[] args) {
    // 데이터 소스
    int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

    // stream
    int sumSequential = Arrays.stream(numbers).sum();
    System.out.println("Sum (Sequential): " + sumSequential);

    // parallel stream
    int sumParallel = Arrays.stream(numbers)
        .parallel()
        .sum();
    System.out.println("Sum (Parallel): " + sumParallel);
}
    
// 결과 출력
Sum (Sequential): 55
Sum (Parallel): 55

stream : Arrays.stream(numbers)를 호출하여 스트림을 생성하고, sum() 메서드를 호출하여 모든 요소의 합을 계산

parallel stream : Arrays.stream(numbers)를 호출하여 스트림을 생성한 후 .parallel() 메서드를 호출하여 병렬 스트림으로 전환, 그리고 나서 sum() 메서드를 호출하여 병렬로 모든 요소의 합을 계산

※  합계를 구하는 간단한 작업. 작은 데이터셋이므로 parallel stream 을 사용하여도 순차 stream 과 동일한 결과.

 

parallel stream 잘못된 사용 예제

public static void main(String[] args) {
    // 데이터 소스
    int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

    // 잘못된 사용 예제
    int sum = Arrays.stream(numbers)
        .parallel()
        .map(n -> {
            if (n == 5) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return n;
        })
        .sum();
        
    System.out.println("Sum: " + sum);
}

// 결과 출력
Sum: 50

 - 배열 numbers를 데이터 소스로 사용하여 parallel stream을 이용하여 요소들을 처리하고 합계(sum)를 계산

- parallel stream 내부의 map 연산에서 특정 조건(n == 5)에 따라 5인 경우 1초의 지연을 발생

- parallel stream은 요소를 병렬로 처리하기 때문에 map 연산의 각 요소는 병렬로 실행

- 조건 n == 5를 만족하는 요소가 존재할 경우 1초의 지연이 발생하고, 이로 인해 전체 연산에 대한 지연이 발생

 

parallel stream 처리 순서 확인

-public static void main(String[] args) {
    // 데이터 소스
    int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

    Arrays.stream(numbers)
        .parallel()
        .map(n -> {
            System.out.println("Map: " + n + " - " + Thread.currentThread().getName());
            return n;
        })
        .forEach(n -> System.out.println("ForEach: " + n + " - " + Thread.currentThread().getName()));
-}

// 결과 출력
Map: 1 - main
Map: 2 - ForkJoinPool.commonPool-worker-1
Map: 5 - ForkJoinPool.commonPool-worker-2
Map: 4 - ForkJoinPool.commonPool-worker-3
Map: 3 - ForkJoinPool.commonPool-worker-4
Map: 6 - main
ForEach: 6 - main
ForEach: 1 - main
ForEach: 2 - ForkJoinPool.commonPool-worker-1
ForEach: 3 - ForkJoinPool.commonPool-worker-4
ForEach: 4 - ForkJoinPool.commonPool-worker-3
ForEach: 5 - ForkJoinPool.commonPool-worker-2

- 배열 numbers를 데이터 소스로 사용하여 parallel stream을 생성하고, map 연산과 forEach 연산을 순차적으로 수행.

- map 연산은 각 요소를 변환하고, forEach 연산은 각 요소를 출력.

- Thread.currentThread().getName() : 현재 실행 중인 스레드의 이름 출력.

- map 연산은 여러 스레드에서 동시에 실행되므로 요소의 처리 순서가 보장되지 않음. 

- forEach 연산은 마지막에 순차적으로 수행되며, 최종 결과인 출력도 순차적으로 출력

※ 중간 연산과 최종 연산의 특성에 따라 다를 수 있으므로, 순서에 의존하는 작업을 수행하는 경우에는 주의가 필요

 

Java 8 에 추가된 기능인 Stream 에 대해서 간략하게 알아보았습니다. 더불어 parallel stream 과의 비교도 확인하였습니다.

반응형
반응형

Java Stream reduce 함수에 대한 설명입니다.

 

바로 소스로 들어가보자!

import java.util.OptionalInt;
import java.util.stream.IntStream;

public class StreamReduce {

    public static void main(String[] args) {
        //Optional<T> reduce(BinaryOperator<T> accumulator);
        OptionalInt reduced1 = IntStream.range(1, 5) // [1, 2, 3, 4]
                        .reduce((a, b) -> {
                            System.out.println("a = " + a);
                            System.out.println("b = " + b);
                            return Integer.sum(a, b);
                        });

        System.out.println(reduced1.getAsInt());


        //T reduce(T identity, BinaryOperator<T> accumulator);
        int reduced2 = IntStream.range(1, 5) // [1, 2, 3, 4]
                        .reduce(10, (a, b) -> {
                            System.out.println("a = " + a);
                            System.out.println("b = " + b);
                            return Integer.sum(a, b);
                        });

        System.out.println(reduced2);
    }
}

 

긴말 필요 없이 결과도 바로 확인해보자!

a = 1
b = 2
a = 3
b = 3
a = 6
b = 4
10

a = 10
b = 1
a = 11
b = 2
a = 13
b = 3
a = 16
b = 4
20

 

Stream 생성 했던 IntStream 의 range 함수는 Javadoc 으로 확인해본다.

IntStream (Java Platform SE 8 ) (oracle.com)

 

IntStream (Java Platform SE 8 )

Returns an infinite sequential ordered IntStream produced by iterative application of a function f to an initial element seed, producing a Stream consisting of seed, f(seed), f(f(seed)), etc. The first element (position 0) in the IntStream will be the prov

docs.oracle.com

 

※ 첫번째 파라미터는 포함, 두번째 파라미터는 불포함

 - 항상 헷갈리는 부분이니까 꼼꼼히 읽어보자!!

static IntStream range(int startInclusive, int endExclusive)
Returns a sequential ordered IntStream from startInclusive (inclusive) to endExclusive (exclusive) by an incremental step of 1.
API Note:
An equivalent sequence of increasing values can be produced sequentially using a for loop as follows:

     for (int i = startInclusive; i < endExclusive ; i++) { ... }
 
Parameters:
startInclusive - the (inclusive) initial value
endExclusive - the exclusive upper bound
Returns:
a sequential IntStream for the range of int elements

 

 

자, 이제 본격적으로 Stream reduce 에 대한 설명 들어보고 가실께여~

1. 먼저 파라미터가 1개인 reduce 함수

        //Optional<T> reduce(BinaryOperator<T> accumulator);
        OptionalInt reduced1 = IntStream.range(1, 5) // [1, 2, 3, 4]
                        .reduce((a, b) -> {
                            System.out.println("a = " + a);
                            System.out.println("b = " + b);
                            return Integer.sum(a, b);
                        });

        System.out.println(reduced1.getAsInt());

첫번째 실행

 - a : Stream 의 첫번째 값 = 1

 - b : Stream 의 두번째 값 = 2

 - a + b = 3 을 반환(return) 한다.

두번째 실행

 - a : 첫번째 실행한 결과 값 = 3

 - b : Stream 의 다음(세번째) 값 = 3

 - a + b = 6 을 반환(return) 한다.

세번째 실행

 - a : 두번째 실행한 결과 값 = 6

 - b : Stream 의 다음(네번째) 값 = 4

 - a + b = 10 을 반환(return) 한다.

최종 실행 결과 값으로 10을 반환한다.

 

2. 먼저 파라미터가 2개인 reduce 함수

 - reduce 함수의 첫번째 파라미터는 초기 값을 의미한다.

        //T reduce(T identity, BinaryOperator<T> accumulator);
        int reduced2 = IntStream.range(1, 5) // [1, 2, 3, 4]
                        .reduce(10, (a, b) -> {
                            System.out.println("a = " + a);
                            System.out.println("b = " + b);
                            return Integer.sum(a, b);
                        });

        System.out.println(reduced2);

 

첫번째 실행

 - a : 초기 값 = 10

 - b : Stream 의 첫번째 값 = 1

 - a + b = 11 을 반환(return) 한다.

두번째 실행

 - a : 첫번째 실행한 결과 값 = 11

 - b : Stream 의 다음(두번째) 값 = 2

 - a + b = 13 을 반환(return) 한다.

세번째 실행

 - a : 두번째 실행한 결과 값 = 13

 - b : Stream 의 다음(세번째) 값 = 3

 - a + b = 16 을 반환(return) 한다.

네번째 실행

 - a : 세번째 실행한 결과 값 = 16

 - b : Stream 의 다음(네번째) 값 = 4

 - a + b = 20 을 반환(return) 한다.

최종 실행 결과 값으로 20을 반환한다.

 

 

위에서 살펴본 Stream reduce 두 함수의 차이는 초기 값이 있냐 없냐의 차이이다.

그리고 Stream reduce 함수에서 가장 중요하게 봐야할 부분은 accumulator 의 동작 방식이다.

첫번째, 두번째 인자에 값이 어떻게 전달되어 최종 결과값을 반환하는지에 대한 이해만 있다면 어렵지 않을 것이다.

 

Stream reduce 에 대한 Javadoc 페이지도 같이 확인하면 좋을 것 같아 링크를 건다.

Stream (Java Platform SE 8 ) (oracle.com)

 

Stream (Java Platform SE 8 )

A sequence of elements supporting sequential and parallel aggregate operations. The following example illustrates an aggregate operation using Stream and IntStream: int sum = widgets.stream() .filter(w -> w.getColor() == RED) .mapToInt(w -> w.getWeight())

docs.oracle.com

 

반응형
반응형

다트는 동기/비동기 프로그래밍을 지원한다.

동기 : 요청을 하고 나서 응답이 올 때까지 기다렸다가 응답.

비동기 : 요청을 하고 나서 응답을 받지 않아도 다음 코드를 진행. 추후에 응답이 오면 처리.

 

Future

- Future 클래스 : 미래에 제너릭으로 값을 받아옴.

// 일정 시간 후 콜백 함수 실행.
// Future.delayed

void main() {
  print("실행 시작");
  Future.delayed(Duration(seconds: 3), (){
    print('3초 후 실행합니다.');
  });
  print("실행 완료?");
}

// 실행 결과(비동기로 동작)
실행 시작
실행 완료?
3초 후 실행합니다.

 

async, await

- 코드를 순서대로 실행시키기 위해 사용

void main() {
  addNumbers(1, 1);
}

// async 키워드는 함수 매개변수 정의와 바디 사이에 입력.
void addNumbers(int number1, int number2) async {
  print('$number1 + $number2 계산 시작!');

  // await는 대기하고 싶은 비동기 함수 앞에 입력.
  await Future.delayed(Duration(seconds: 3), (){
    print('$number1 + $number2 = ${number1 + number2}');
  });

  print('$number1 + $number2 코드 실행 끝');
}

// 실행 결과
1 + 1 계산 시작!
1 + 1 = 2
1 + 1 코드 실행 끝

 

비동기적으로 동작하는 지 확인

void main() {
  addNumbers(1, 1);
  addNumbers(2, 2);
}

Future<void> addNumbers(int number1, int number2) async {
  print('$number1 + $number2 계산 시작!');

  await Future.delayed(Duration(seconds: 3), (){
    print('$number1 + $number2 = ${number1 + number2}');
  });

  print('$number1 + $number2 코드 실행 끝');
}

// 실행 결과
1 + 1 계산 시작!
2 + 2 계산 시작!
1 + 1 = 2
1 + 1 코드 실행 끝
2 + 2 = 4
2 + 2 코드 실행 끝

 

두 함수를 순차적으로 실행하고 싶다면...

// main 함수에 async, await 사용.
void main() async {
  await addNumbers(1, 1);
  await addNumbers(2, 2);
}

Future<void> addNumbers(int number1, int number2) async {
  print('$number1 + $number2 계산 시작!');

  await Future.delayed(Duration(seconds: 3), (){
    print('$number1 + $number2 = ${number1 + number2}');
  });

  print('$number1 + $number2 코드 실행 끝');
}

// 실행 결과
1 + 1 계산 시작!
1 + 1 = 2
1 + 1 코드 실행 끝
2 + 2 계산 시작!
2 + 2 = 4
2 + 2 코드 실행 끝

 

결과 값을 비동기로 리턴 받고 싶다면...

void main() async {
  final result = await addNumbers(1, 1);
  print('결과값 $result');  // 일반 함수와 동일하게 반환값을 받을 수 있음
  final result2 = await addNumbers(2, 2);
  print('결과값 $result2');
}

Future<int> addNumbers(int number1, int number2) async {
  print('$number1 + $number2 계산 시작!');

  await Future.delayed(Duration(seconds: 3), (){
    print('$number1 + $number2 = ${number1 + number2}');
  });

  print('$number1 + $number2 코드 실행 끝');

  return number1 + number2;
}

//
1 + 1 계산 시작!
1 + 1 = 2
1 + 1 코드 실행 끝
결과값 2
2 + 2 계산 시작!
2 + 2 = 4
2 + 2 코드 실행 끝
결과값 4

 

 

Stream

- Future 는 결과값을 한번 리턴.

- 지속적으로 리턴받고 싶을 때는 Stream 을 사용.

- Stream 은 한번 Listen 하면 지속적으로 값을 받아온다.

import 'dart:async';

void main() {
  final controller = StreamController();  // StreamController 선언
  final stream = controller.stream;  // Stream 가져오기

  // Stream에 listen() 함수를 실행하면 값이 주입될 때마다 콜백 함수를 실행
  final streamListener1 = stream.listen((val) {
    print(val);
  });

  // Stream에 값을 주입할 때는 sink.add() 함수를 실행
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
  controller.sink.add(4);
}

// 실행 결과
1
2
3
4

 

Broadcasting

- Stream 을 여러번 Listen 하기

import 'dart:async';

void main() {
  final controller = StreamController();

  // 여러 번 listen할 수 있는 Broadcast Stream 객체 생성
  final stream = controller.stream.asBroadcastStream();

  // listen() 함수
  final streamListener1 = stream.listen((val) {
    print('listening 1');
    print(val);
  });

  // listen() 함수 추갸.
  final streamListener2 = stream.listen((val) {
    print('listening 2');
    print(val);
  });

  // add()를 실행할 때마다 listen()하는 모든 콜백 함수에 값이 주입
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);

}

// 실행 결과.
listening 1
1
listening 2
1
listening 1
2
listening 2
2
listening 1
3
listening 2
3

 

함수로 Stream 반환하기

- 이건 잘 모르겠음. --;;

import 'dart:async';

// Stream을 반환하는 함수는 async*로 선언합니다.
Stream<String> calculate() async* {
  for (int i = 0; i < 5; i++) {
    // StreamController의 add()처럼 yield 키워드를 이용해서 값 반환
    yield 'i = $i';
    await Future.delayed(Duration(seconds: 1));
  }
}

void playStream() {
  // StreamController와 마찬가지로 listen() 함수로 콜백 함수 입력
  calculate().listen((val) {
    print(val);
  });
}

void main() {
  playStream();
}

// 실행 결과(1초에 한줄씩 출력)
i = 0
i = 1
i = 2
i = 3
i = 4

 

이번에는 비동기 프로그래밍에 대해서 알아봤다.

점점 어려워지는구만..ㅋㅋ

반응형

+ Recent posts