はじめに #
re:Invent 2025で発表されたCloudWatch Pipelinesが気になっていたので、ログの変換・正規化がどこまでマネージドにできるのか実際に試してみました。
CloudWatch Pipelinesは、ログデータの収集・変換・ルーティングをフルマネージドで提供するサービスです。2025年12月のre:Inventで発表されました。
ログ処理でよくある「フォーマットがバラバラ問題」を、インフラ管理なし・パイプライン処理自体は追加料金なしで解決できるのがポイントです(ソース種別によって料金の計上タイミングは異なるので後述)。
従来のログ変換、どうしてた? #
CloudWatch Logsに流れてくるログを加工したいとき、これまではこんな構成が多かったと思います。
パターン1: Lambda経由
flowchart LR
A[CloudWatch Logs] --> B[Subscription Filter]
B --> C[Lambda]
C -- 加工 --> D[CloudWatch Logs<br/>別ロググループ]
パターン2: Firehose経由
flowchart LR
A[CloudWatch Logs] --> B[Subscription Filter]
B --> C[Kinesis Firehose]
C --> D[Lambda Transform]
D --> E[S3 / OpenSearch]
どちらも自前で管理するコンポーネントが多いのが課題でした。
- Lambdaの実装・デプロイ・エラーハンドリング
- Subscription Filterの上限(ロググループあたり2個まで)
- Firehoseのバッファリング設定やリトライ管理
- スケーリングの考慮
「JSONのフィールド名を統一したいだけなのに、そこまで必要?」というケースがわりとあります。
Pipelinesのアーキテクチャ #
CloudWatch Pipelinesは Source → Processor → Sink の3層構成です。
flowchart LR
subgraph Source
direction TB
S1[CloudWatch Logs]
S2[S3<br/>SQS経由]
S3[サードパーティ<br/>CrowdStrike等<br/>API or S3+SQS]
S1 ~~~ S2 ~~~ S3
end
subgraph Processors["Processors (合計最大20)"]
direction TB
P1["Parser<br/>(任意1つ・必ず先頭)<br/>例: parse_json / ocsf / grok"]
subgraph Others["以降は順不同・繰り返し可"]
direction TB
P2["Transformation系<br/>キー/値の追加・削除・リネーム<br/>例: add_entries / delete_entries / move_keys"]
P3["String系<br/>文字列値の変換<br/>例: lowercase_string / uppercase_string"]
end
P1 --> Others
end
subgraph Sink
D1[CloudWatch Logs]
end
Source --> Processors --> Sink
| コンポーネント | 必須 | 個数 | 説明 |
|---|---|---|---|
| Source | Yes | 1 | データソース。CloudWatch Logs、S3 (SQS経由)、サードパーティ(API直連携 または S3+SQS) |
| Processor | No | 0〜20 | 順序付きの変換処理。Parser(parse_json/ocsf/grok 等)は最大1つかつ先頭固定。それ以降は Transformation系 / String系 等を順不同・繰り返し可で並べられる |
| Sink | Yes | 1 | 出力先。現時点ではCloudWatch Logsのみ |
従来手法との比較 #
| 項目 | CloudWatch Pipelines | Kinesis Firehose + Lambda | Subscription Filter + Lambda |
|---|---|---|---|
| マネージド | フルマネージド | Firehoseはマネージド、Lambdaは自前 | Lambdaは自前 |
| 追加コスト | パイプライン処理自体は追加料金なし(CWLソースは処理前、S3/サードパーティは処理後のCustom logsとして課金) | Firehose配信料 + Lambda実行料 | Lambda実行料 |
| ログ変換 | 組み込みプロセッサ | Lambda内で自由に実装 | Lambda内で自由に実装 |
| OCSF変換 | 組み込みサポート | 自前実装 | 自前実装 |
| 出力先 | CloudWatch Logsのみ | S3, Redshift, OpenSearch, Splunk等 | 任意(Lambda次第) |
| インフラ管理 | 不要 | Firehose設定 + Lambda管理 | Lambda管理 |
| サードパーティ取り込み | CrowdStrike、Okta、Palo Alto、Wiz、Zscaler 等を組み込みサポート | なし | なし |
Pipelinesが向いているケース:
- ログのJSON正規化、フィールド追加/削除、文字列変換などの定型処理
- OCSF準拠が必要なセキュリティログ
- CrowdStrike, Okta等のサードパーティログをCloudWatchに集約したい
- Lambdaを書きたくない(管理したくない)
従来手法が向いているケース:
- CloudWatch Logs以外への出力が必要(S3, OpenSearch等)
- 複雑なビジネスロジックを含む変換
- リアルタイムで外部に転送したい
要するに、**「CloudWatch Logs内で完結するログ変換ならPipelines一択」**という棲み分けです。
ハンズオン:アプリケーションログをJSON正規化する #
実際にパイプラインを組んで、バラバラなフォーマットのアプリケーションログを正規化してみます。
やりたいこと #
アプリケーションから流れてくるログのフォーマットがバラバラ:
// ログA(構造化済み)
{"timestamp": "2026-04-12T10:00:00Z", "level": "INFO", "message": "Order created", "userId": "u-123"}
// ログB(フィールド名が違う)
{"ts": "2026-04-12T10:01:00Z", "severity": "ERROR", "msg": "Payment failed", "user_id": "u-456"}
これを統一したい:
ts→timestamp、severity→level、msg→message、user_id→userIdlevelを小文字に統一- 環境情報(
environment: production)を付与
CloudFormationテンプレート #
AWSTemplateFormatVersion: '2010-09-09'
Description: CloudWatch Pipeline - Log Normalization Example
Resources:
# 対象のロググループ
AppLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /app/my-service
RetentionInDays: 14
# CloudWatch Logs ソース用 IAMロール
# source.cloudwatch_logs.aws.sts_role_arn に指定するロール。
# CloudWatch Logs サービス(logs.amazonaws.com)が assume して、
# 対象ロググループからイベントを読み取るために使う。
# ※ パイプライン自体を作成する側の呼び出し者には
# 別途 logs:PutPipelineRule / logs:DeletePipelineRule 等の権限が必要。
PipelineSourceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: logs.amazonaws.com
Action: sts:AssumeRole
# パイプライン本体
LogNormalizationPipeline:
Type: AWS::ObservabilityAdmin::TelemetryPipelines
Properties:
Name: app-log-normalizer
Configuration:
Body: !Sub |
pipeline:
source:
cloudwatch_logs:
log_event_metadata:
data_source_name: "my_app_logs"
data_source_type: "default"
aws:
sts_role_arn: "${PipelineSourceRole.Arn}"
processor:
- parse_json: {}
- move_keys:
entries:
- from_key: "ts"
to_key: "timestamp"
overwrite_if_to_key_exists: false
- from_key: "severity"
to_key: "level"
overwrite_if_to_key_exists: false
- from_key: "msg"
to_key: "message"
overwrite_if_to_key_exists: false
- from_key: "user_id"
to_key: "userId"
overwrite_if_to_key_exists: false
- lowercase_string:
with_keys:
- "level"
- add_entries:
entries:
- key: "environment"
value: "production"
overwrite_if_key_exists: false
sink:
- cloudwatch_logs:
log_group: "@original"
Tags:
- Key: Purpose
Value: log-normalization
Outputs:
PipelineArn:
Value: !GetAtt LogNormalizationPipeline.Arn
ポイント:
parse_json→move_keys→lowercase_string→add_entriesの順でプロセッサをチェーンparse_jsonは parser processor なので先頭に1つだけ配置(ocsfやgrokも同じ枠で、混在不可)sinkのlog_group: "@original"は元のロググループにそのまま書き戻す設定- パイプライン処理自体には追加料金が発生しない(CloudWatch Logs ソースの場合は処理前に通常の取り込み料金として課金)
デプロイと動作確認 #
# デプロイ
aws cloudformation deploy \
--template-file template.yaml \
--stack-name cw-pipeline-demo \
--capabilities CAPABILITY_IAM \
--region ap-northeast-1
# ログストリームを事前に作成(put-log-events の前提)
aws logs create-log-stream \
--log-group-name /app/my-service \
--log-stream-name test-stream \
--region ap-northeast-1
# テストログを送信
aws logs put-log-events \
--log-group-name /app/my-service \
--log-stream-name test-stream \
--log-events \
timestamp=$(date +%s000),message='{"ts":"2026-04-12T10:01:00Z","severity":"ERROR","msg":"Payment failed","user_id":"u-456"}' \
--region ap-northeast-1
# ※ 2回目以降は describe-log-streams で uploadSequenceToken を取得し
# --sequence-token で渡す必要があります
変換結果 #
パイプライン通過後のログ:
{
"timestamp": "2026-04-12T10:01:00Z",
"level": "error",
"message": "Payment failed",
"userId": "u-456",
"environment": "production"
}
ts→timestampにリネームseverity→levelにリネーム + 小文字化msg→messageにリネームuser_id→userIdにリネームenvironment: productionが付与
Lambdaのコードを1行も書かずに、ログの正規化ができました。
OCSF変換も試してみる #
セキュリティログの標準フォーマットであるOCSF(Open Cybersecurity Schema Framework)への変換も組み込みサポートされています。CloudTrail、VPC Flow Logs、Route 53 Resolverログ、AWS WAFなど主要なAWSサービスに加え、CrowdStrike、Okta、Palo Alto等のサードパーティログもOCSFに変換できます。
# OCSF変換パイプラインの例
# (自前のロググループにCloudTrail形式JSONを流している想定 = custom logs)
pipeline:
source:
cloudwatch_logs:
log_event_metadata:
data_source_name: "cloudtrail_logs" # 任意の識別名
data_source_type: "default" # customログは default
aws:
sts_role_arn: "arn:aws:iam::123456789012:role/PipelineSourceRole"
processor:
- ocsf:
version: "1.5"
schema:
cloud_trail:
sink:
- cloudwatch_logs:
log_group: "@original"
AWS vended の CloudTrail ログ(
data_source_name: aws_cloudtrail)を直接扱う場合は、使えるプロセッサがocsfのみ、または空([]) に制限されます。move_keys等との組み合わせができないので、上記のように「自前ロググループ + default ソース」として扱う方が柔軟性が高い場面もあります。
これだけで、CloudTrailのJSON形式ログがOCSF v1.5準拠に変換されます。Security Lakeやサードパーティ SIEMとの連携で「OCSF準拠にしてくれ」と言われたときに、Lambda不要で対応できるのは嬉しい。
CloudWatch Logs ソース側で対応している schema は、現時点で
cloud_trail/route53_resolver/vpc_flow/eks_audit/aws_waf等です。最新の一覧はOCSF processor のドキュメントを参照してください。
知っておくべき制限事項 #
| 制限 | 詳細 |
|---|---|
| Sinkの出力先 | CloudWatch Logsのみ(S3やOpenSearchには直接出力できない) |
| パイプライン数 | アカウントあたり最大330(CW Logsソース300 + その他30) |
| プロセッサ数 | パイプラインあたり最大20 |
| パーサープロセッサ | パイプラインあたり1つまで、かつ先頭に配置 |
| イベントサイズ | 変換後256KB以下 |
include_original |
有効にすると元ログと変換後ログ両方保存(CWLソース限定、processor1つ以上必須、{} で指定)。ストレージコストは変換後サイズ・保持期間次第だが単純には増加 |
| CW Logsソースのsink | @original(元のロググループ)固定。別ロググループへの転送は不可 |
一番大きい制限は出力先がCloudWatch Logsのみという点。変換後のログをS3に出したい場合は、Pipeline通過後にSubscription Filter → Firehose → S3という従来の構成を追加する必要があります。
モニタリング #
パイプラインのメトリクスは AWS/Observability Admin 名前空間で確認できます。
PipelineErrors— エラー発生数。アラーム設定推奨PipelineErrorsByErrorType— エラー種別ディメンション付き。ACCESS_DENIED/PARSE_FAILURE/PROCESSOR_ERRORS/PAYLOAD_SIZE_EXCEEDED/RESOURCE_NOT_FOUND/SOURCE_READ_FAILURE/ALLなどが用意されている(最新は公式ドキュメント参照)PipelineWarnings—THROTTLED(レート超過)等PipelineBytesIn/PipelineBytesOut— スループットPipelineRecordsIn/PipelineRecordsOut— レコード数
特に PARSE_FAILURE はフォーマットが想定と異なるログが流れてきたときに発生するので、初期は注意して見ておくのが良いです。
最後に #
- 「CloudWatch Logs内で完結するログ変換」に最適 — パイプライン処理自体は追加料金なし、インフラ管理なし(ソース種別で課金タイミングが異なる点は注意)
- OCSF変換がビルトイン — セキュリティログの標準化がLambdaなしで可能
- サードパーティ連携が充実 — CrowdStrike、Okta、Palo Alto、Wiz、Zscaler 等のログを API 直連携または S3+SQS で取り込める
- 制限はSinkがCloudWatch Logsのみ — S3やOpenSearchに出したい場合は従来手法と組み合わせが必要
Log Transformers(2024年11月発表)の進化版として、ログ変換パイプラインの選択肢がかなり広がりました。「ログのフォーマット統一のためだけにLambda書くのつらい」と思っていた人は、まず試してみる価値があると思います。
参考リンク #
- Amazon CloudWatch introduces unified data management and analytics | AWS News Blog
- CloudWatch Pipelines Documentation
- Pipeline Processors
- Pipeline Sinks
- CloudWatch Logs Transform and Enrich (Transformers)
- CloudWatch Pipelines Compliance and Governance
- CloudWatch Pricing