본문 바로가기

AWS

Elastic Search - Cloudwatch 연동

연동을 위한 Elasticsearch 도메인 생성

AWS Console에서 ElasticSearch Service에 접속 후 도메인 생성

 

IP로 접근 하기 위한 퍼블릭 액세스 설정 및 접근 허용

생성 후 로드가 되고 난 후의 Elasticsearch의 모습

엔드포인트 및 Kibana 연동도 자동으로 되는 것을 알 수 있다.

지난 번 생성한 로그 그룹에서 Elasticsearch 구독 필터 생성

로그 그룹 → 우측 상단 작업 → 구독 필터 → Elasticsearch 구독 필터 생성

설정을 완료한 후 구독 필터가 생성된 모습

생성된 스트림을 Lambda에서 확인할 수 있다.

가서 소스 코드 변경.

소스 코드는 아래와 같이 입력한다.

아래 소스는 node.js 기반으로 log 및 slowquery을 추출해주는 코드이다.

// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');

var endpoint = 'search-slowquery-lpj2p5utan77wqwln5dyetm4vy.ap-northeast-2.es.amazonaws.com';

// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;
 
exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer.from(input.awslogs.data, 'base64');
     
    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
        if (error) { context.fail(error); return; }
 
        // parse the input from JSON
        var awslogsData = JSON.parse(buffer.toString('utf8'));
 
        // transform the input to Elasticsearch documents
        var elasticsearchBulkData = transform(awslogsData);
 
        // skip control messages
        if (!elasticsearchBulkData) {
            console.log('Received a control message');
            context.succeed('Control message handled successfully');
            return;
        }
 
        // post documents to the Amazon Elasticsearch Service
        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
            console.log('Response: ' + JSON.stringify({
                "statusCode": statusCode
            }));
 
            if (error) {
                logFailure(error, failedItems);
                context.fail(JSON.stringify(error));
            } else {
                console.log('Success: ' + JSON.stringify(success));
                context.succeed('Success');
            }
        });
    });
};
 
function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }
    var bulkRequestBody = '';
 
    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);
 
        // index name format: cwl-YYYY.MM.DD Slowquery modify 1
/*       
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
*/
        var indexName = [
            'cwl' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(),              // log group + year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
         
        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;
 
        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;
         
        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('\n') + '\n';
    });
    return bulkRequestBody;
}
 
function buildSource(message, extractedFields) {
 
    if (extractedFields) {
        var source = {};
 
        for (var key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                var value = extractedFields[key];
 
                if (isNumeric(value)) {
                    source[key] = 1 * value;
                    continue;
                }
 
                jsonSubString = extractJson(value);
                if (jsonSubString !== null) {
                    source['$' + key] = JSON.parse(jsonSubString);
                }
 
                source[key] = value;
            }
        }
        return source;
    }

    jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString);
    }
 
    return {};
}
 
function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
}
 
function isValidJson(message) {
    try {
        JSON.parse(message);
    } catch (e) { return false; }
    return true;
}
 
function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
}
 
function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);
 
    var request = https.request(requestParams, function(response) {
        var responseBody = '';
        response.on('data', function(chunk) {
            responseBody += chunk;
        });
 
        response.on('end', function() {
            var info = JSON.parse(responseBody);
            var failedItems;
            var success;
            var error;
             
            if (response.statusCode >= 200 && response.statusCode < 299) {
                failedItems = info.items.filter(function(x) {
                    return x.index.status >= 300;
                });
 
                success = {
                    "attemptedItems": info.items.length,
                    "successfulItems": info.items.length - failedItems.length,
                    "failedItems": failedItems.length
                };
            }
 
            if (response.statusCode !== 200 || info.errors === true) {
                // prevents logging of failed entries, but allows logging
                // of other errors such as access restrictions
                delete info.items;
                error = {
                    statusCode: response.statusCode,
                    responseBody: info
                };
            }
 
            callback(error, success, response.statusCode, failedItems);
        });
    }).on('error', function(e) {
        callback(e);
    });
    request.end(requestParams.body);
}
 
function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');
     
    var request = {
        host: endpoint,
        method: 'POST',
        path: '/_bulk',
        body: body,
        headers: {
            'Content-Type': 'application/json',
            'Host': endpoint,
            'Content-Length': Buffer.byteLength(body),
            'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
            'X-Amz-Date': datetime
        }
    };
 
    var canonicalHeaders = Object.keys(request.headers)
        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
        .join('\n');
 
    var signedHeaders = Object.keys(request.headers)
        .map(function(k) { return k.toLowerCase(); })
        .sort()
        .join(';');
 
    var canonicalString = [
        request.method,
        request.path, '',
        canonicalHeaders, '',
        signedHeaders,
        hash(request.body, 'hex'),
    ].join('\n');
 
    var credentialString = [ date, region, service, 'aws4_request' ].join('/');
 
    var stringToSign = [
        'AWS4-HMAC-SHA256',
        datetime,
        credentialString,
        hash(canonicalString, 'hex')
    ] .join('\n');
 
    request.headers.Authorization = [
        'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
        'SignedHeaders=' + signedHeaders,
        'Signature=' + hmac(kSigning, stringToSign, 'hex')
    ].join(', ');
 
    return request;
}
 
function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}
 
function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
 
function logFailure(error, failedItems) {
    if (logFailedResponses) {
        console.log('Error: ' + JSON.stringify(error, null, 2));
 
        if (failedItems && failedItems.length > 0) {
            console.log("Failed Items: " +
                JSON.stringify(failedItems, null, 2));
        }
    }
}

 

소스 코드 입력 후 테스트는 아래와 같이 할 수 있도록 한다.

포맷을 Amazon Cloudwatch log로 지정하면 아래와 같은 화면을 볼 수 있다.

지정을 해주지 않을 경우 DataType이 맞지 않는 에러가 발생한다.

 

성공적으로 Lambda 함수가 실행된 모습이다.

추가적으로 Elasticsearch Service에 데이터를 넣는 작업이므로 기존 로그 그룹 설정 Role에서

Role을 다음과 같이 입력해야 403 오류 (권한) 없이 실행이 가능하다.

Elasticsearch 인덱스 및 Kibana에 성공적으로 데이터가 입력된 것을 확인할 수 있다.

'AWS' 카테고리의 다른 글

RDS에서 DDB로 Data Insert  (0) 2021.04.12
AWS Command Line Interface (AWS CLI)  (0) 2021.04.12
AWS Lambda  (0) 2021.04.12
Boto3 (AWS SDK for Python)  (0) 2021.04.12
Amazon Athena  (0) 2021.04.12