【第4回】AmazonS3で発生したイベント契機で実行するサーバレスアプリケーション(1)


前回は、S3へファイルをダイレクトアップロードするアプリケーションを実装しました。続く今回はアップロードした後の後続処理を AWS Lambdaを使ってサーバレスで実行するアプリケーションを実装していきます。なお、動作するAWS Lambdaファンクションは以下のランタイム・ライブラリを使って実装するものとします。


動作対象 バージョン
Java 1.8
Spring Boot 2.3.1.RELEASE
Spring Cloud Function 3.0.8.RELEASE
Spring Cloud AWS 2.2.2.RELEASE
Spring Data DynamoDB 5.2.1
AmazonSDK Lambda Java event 2.0.2
AmazonSDK Lambda Java core 1.1.0



AWS Lambdaを使ったS3ダイレクトアップロード後の後続処理


前回実装したS3へのダイレクトアップロード処理が完了した後、後処理が必要なケースが多くあります。 例えば、アップロードしたファイルのオブジェクトキーをデータベースに保存したり、アップロードされたファイルのチェックや加工など、 実行したい内容に応じて後続処理を実装しますが、性能負荷を気にせずスケール可能なAWS Lambdaを有力な選択肢の一つとして選択できます。 ただし、Lambdaファンクション内で実装する処理に応じて適切なマネージドサービスやミドルウェアを選択し、エラー発生時のハンドリングも同時に考慮しなければなりません。 代表的な処理パターンとして、以下の図のようなケースが挙げられます。


../_images/serverless-postprocess.png


各処理の詳細は以下の通りです、(1)〜(5)は前回までに実装したS3へのダイレクトアップロード処理に相当します。


S3ダイレクトアップロード時の処理フロー
項番 説明
クライアント(ブラウザ)からファイルをアップロードするための要求リクエストをサーバ(ECSアプリケーション)へ発出します。
ECSアプリケーションはクライアントからS3にダイレクトアップロードさせるよう、STSに一時的な認証情報を要求します。
STSはアプリケーションに一時認証情報を発行します。アプリケーションでは認証情報をもとに、署名したPostポリシーなどのダイレクトアップロードに必要な情報を作成します。
アプリケーションにクライアントにダイレクトアップロードに必要なURLや署名したポリシーなどの情報を返却します。
クライアントはS3へ署名とともにファイルをダイレクトアップロードします。
アップロード先のS3のバケットにファイルがアップロード(Post/Put)されるとイベントを発生させるようS3側に設定を行っておき、アップロードを契機として、AWS Lambdaファンクションを実行させます。
アップロードしたファイルに応じて後処理を実行します。
(7') アップロードしたファイルのオブジェクトキーやファイルサイズといったメタデータをDynamoDBに保存します。 DynamoDBは、連載クラウドネイティブアプリケーション基本編の 第15回 で解説したように、Lambdaと同様、スケーラビリティを有するデータベースですので、大量の書き込みアクセスが発生するようなユースケースでのデータ保存が適しています。 大量のファイルが多数のクライアントから常時アップロードするような場合に向いているケースです。
(7'') アップロードしたファイルのメタ情報や結果をVPC内で動いているサーバへ連携し、RDSなどに保存したり、クライアントへ通知するためにSQSを使ってメッセージキューとして送信します。※
(7''') アップロード後の処理の中でエラーが発生した場合、CloudWatch Logsにログを出力して、それを契機として再びLambdaファンクションを起動し、デッドレターキューを送信します。
蓄積されたキューに対し、アプリケーションからポーリングしてメッセージキューを取得します。
受け取ったキューから情報取得して、バックエンドのRDSへ接続するサービスを呼び出します。
バックエンドのRDSへアップロードしたファイルのメタデータを保存します。RDS内の別のテーブルのデータとジョインさせたりする必要がある場合に向いているケースです。
クライアントに対して結果を通知する等の場合に、クライアントとWebSocketsコネクションを確立しているサーバとの連携用途でElastiCacheへメッセージを保存します。
クライアントとWebSocketsコネクションを確立しているサーバがクライアントへプッシュ通知します。この実装例は次回以降詳説します。


注釈

AWS LambdaをVPCにアタッチして、直接RDSなどへアクセスする処理を記載することも可能ですが、リクエスト数によってはコネクションが多数張られてRDSに過負荷となるため、SQSなどを経由して、コネクションが確立されているサーバを経由するか、RDS Proxy など使用することを検討します。


(8)-(10)はクラウドネイティブ基本編 第6回第12回第32回 で解説しているので、適宜参照してください。 本連載では、(6)-(7)や、(11)-(12)を順次実装し説明していきます。


S3アップロード後に実行するLambdaファンクションの実装


まず、後処理として実行するLambdaファンクションを実装します。本連載で実際に作成するアプリケーションでは GitHub 上にコミットしています。 以降に記載するソースコードでは、import文など本質的でない記述を省略している部分があるので、実行コードを作成する際は、必要に応じて適宜GitHubにあるソースコードも参照してください。

アプリケーションの実装はクラウドネイティブアプリケーションの基本 第一回 と同様、Spring Cloud Functionを使用したアプリケーションで実装します。 必要なライブラリは以下の通りですが、DynamoDBとSQSへアクセスするので、Spring Cloud AWSと、Spring Data DynamoDBのライブラリを追加します。


<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-function-context</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-function-adapter-aws</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-aws</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-aws-messaging</artifactId>
</dependency>
<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>aws-lambda-java-events</artifactId>
  <version>2.0.2</version>
</dependency>
<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>aws-lambda-java-core</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>io.github.boostchicken</groupId>
  <artifactId>spring-data-dynamodb</artifactId>
  <version>5.2.1</version>
</dependency>


それでは、第一回同様、HandlerクラスとイベントとしてLamnda関数から実行されるFunctionクラスから解説を進みます。今回は、S3のファイルアップロードを契機にイベント受信するので、 HandlerクラスはSpirngBootRequestHandlerを継承して作成します。継承するクラスの型パラメータとしては、Inputにcom.amazonaws.services.s3.event.S3EventNotificationを、OutputにはString型を指定します。


package org.debugroom.mynavi.sample.aws.lambda.s3event.app.handler;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.s3.event.S3EventNotification;
import org.springframework.cloud.function.adapter.aws.SpringBootRequestHandler;

public class S3UploadEventHandler extends
     SpringBootRequestHandler<S3EventNotification, String> {

    @Override
    public Object handleRequest(S3EventNotification event, Context context) {
        return super.handleRequest(event, context);
    }

}


続いて、Functionクラスです。S3EventNotificationをFlux型で受け取るため、そこからイベントレコードを抽出して、DynamoDBとSQSキューを送信するサービスクラスを作成して呼び出す実装とします。


package org.debugroom.mynavi.sample.aws.lambda.s3event.app.function;

import java.util.function.Function;
import reactor.core.publisher.Flux;

import com.amazonaws.services.s3.event.S3EventNotification;
import org.springframework.beans.factory.annotation.Autowired;

import org.debugroom.mynavi.sample.aws.lambda.s3event.app.moodel.UploadFileMapper;
import org.debugroom.mynavi.sample.aws.lambda.s3event.domain.service.SampleService;

public class SampleFunction implements
     Function<Flux<S3EventNotification>, Flux<String>> {

    @Autowired
    SampleService sampleService;

    @Override
    public Flux<String> apply(Flux<S3EventNotification> s3EventNotificationFlux) {

        s3EventNotificationFlux.subscribe(s ->{
            if(0 != s.getRecords().size()){
                sampleService.uploadPostProcess(
                  UploadFileMapper.map(s.getRecords().get(0)));
            }
        });
        return Flux.just("Complete!");
    }

}


サービスクラスでは、DynamoDBのテーブルへアクセスするUploadFileRepositoryクラスとメッセージキューを送信する処理をまとめたMessageRepositoryを呼び出す処理を実装します。 SQSメッセージはJacksonのマッパーライブラリを使って、JSON文字列に変換して送信します。


package org.debugroom.mynavi.sample.aws.lambda.s3event.domain.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.debugroom.mynavi.sample.aws.lambda.s3event.domain.model.UploadFile;
import org.debugroom.mynavi.sample.aws.lambda.s3event.domain.repository.MessageRepository;
import org.debugroom.mynavi.sample.aws.lambda.s3event.domain.repository.UploadFileRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class SampleServiceImpl implements SampleService{

    @Autowired
    ObjectMapper objectMapper;

    @Autowired
    MessageRepository messageRepository;

    @Autowired
    UploadFileRepository uploadFileRepository;

    @Override
    public void uploadPostProcess(UploadFile uploadFile) {
        uploadFile.setFileId(UUID.randomUUID().toString());
        uploadFileRepository.save(uploadFile);
        try{
            messageRepository.save(objectMapper.writeValueAsString(uploadFile));
        }catch (JsonProcessingException e){
            e.printStackTrace();
        }
    }

}


DynamoDBとSQSキューを送信する処理や設定の詳細については、クラウドネイティブ基本編 第17回第29回 を参照してください。 また、DynamoDBやSQSのエンドポイント等の必要な情報はCloudFormationのスタック情報から取得するようにします。こちらの詳細は、「AWSで実践!基盤・デプロイ自動化」の 第34回 を参照してください。 今回の実装で、DynamoDBでは以下のようなモデル情報をS3EventNotificationから受け取って保存するupload-file-tableを作成するものとします。


package org.debugroom.mynavi.sample.aws.lambda.s3event.domain.model;

// omit

@DynamoDBTable(tableName = "upload-file-table")
public class UploadFile {

    @Id
    @DynamoDBHashKey
    private String fileId;
    @DynamoDBAttribute
    private String awsRegion;
    @DynamoDBAttribute
    private String objectKey;
    @DynamoDBAttribute
    private String eventName;
    @DynamoDBAttribute
    private String eventSource;
    @DynamoDBAttribute
    private String eventVersion;
    @DynamoDBAttribute
    private String eventTime;
    @DynamoDBAttribute
    private String ipAddress;
    @DynamoDBAttribute
    private String principalId;
    @DynamoDBAttribute
    private String bucketName;
    @DynamoDBAttribute
    private String arn;
    @DynamoDBAttribute
    private String ownerIdentity;
    @DynamoDBAttribute
    private String urlDecordedKey;
    @DynamoDBAttribute
    private String size;
    @DynamoDBAttribute
    private String sequencer;
    @DynamoDBAttribute
    private String eTag;

}


警告

今回使用したライブラリのバージョンでは、これまで通りSpringBootのオートコンフィグレーション機能を利用していると、アプリケーション実行時に org.springframework.beans.factory.BeanDefinitionStoreException: Failed to process import candidates for configuration class. Caused by: java.lang.ArrayStoreException: sun.reflect.annotation.TypeNotPresentExceptionProxy が発生します。 オートコンフィグレーション機能は、特定のライブラリを含めるだけであらかじめ準備された設定クラスの自動構築を行う機能ですが、今回のケースではspring-cloud-starter-aws-messagingのライブラリを含めたときに有効化される MessagingAutoConfigurationのインナークラスSnsAutoCinfigurationに設定されている@EableSnsアノテーションを取得する際にエラーとなっていました。今回SNSは利用しないので、以下の通り、MessagingAutoConfigurationをオートコンフィグレーション対象から除外して、使用するSQSだけを有効化するよう@EnableSqsを設定クラスに付与する実装で対応しています。


package org.debugroom.mynavi.sample.aws.lambda.s3event.config;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.aws.autoconfigure.messaging.MessagingAutoConfiguration;

// omit

@SpringBootApplication(exclude = MessagingAutoConfiguration.class)
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }

    // omit
}
package org.debugroom.mynavi.sample.aws.lambda.s3event.config;

// omit

@EnableSqs
@Configuration
public class SqsConfig {
  // omit
}


なお、対象のAutoConfiguration機能でのエラー原因を特定するには、エラーが発生している箇所でデバックして特定する必要があります。


今回はS3からイベントを受けて、DynamoDBへの情報の保存とSQSキューを送信するLambdaファンクションの実装を紹介しました。次回は実際にAWS上に、Lambdaのデプロイや環境を構築して実行してみます。


著者紹介

川畑 光平(KAWABATA Kohei) - NTTデータ

../_images/aws_361383_075.jpeg

金融機関システム業務アプリケーション開発・システム基盤担当、ソフトウェア開発自動化関連の研究開発を経て、デジタル技術関連の研究開発・推進に従事。

Red Hat Certified Engineer、Pivotal Certified Spring Professional、AWS Certified Solutions Architect Professional等の資格を持ち、アプリケーション基盤・クラウドなど様々な開発プロジェクト支援にも携わる。

AWS Top Engineers & Ambassadors 選出。

本連載記事の内容に対するご意見・ご質問は Facebook まで。