Gigging as a contractor, those days are over…

The last two years as an IT contractor have been really intense, with both of my contracts ending early and giving that uneasy unemployed feeling. My last assignment with a major media company has been a great opportunity to get exposed to lots of design patterns and best practices. Life as a systems reliability engineer was exactly what I needed to see what operating at scale under enterprise conditions. I got an opportunity to get better at terraform and cloudformation, use chef and gitlab-ci, build containers and deploy them to a kubernetes cluster in AWS. Just brain meltingly  cool stuff!

Sadly, the pandemic shut down my time as an SRE prematurely, so back into the world of job hunting. Very scary, as the day I got my layoff notice the recruiters stopped calling because the talent pool became an ocean, so many IT people were being let go. My SRE team got cut 40%, and others I thought would be critical an un-cut-able are looking for work.

Luckily, I had maintained contact with a friend from a previous contract (at yet another major media company), and he turned me onto an opportunity in financial technology. I’m now working as devops engineer (I know, it’s not a job title, but it is here)
 in Azure for a well known bank. That is something of a sea change for me, but I’m looking forward to it, and it is a proper J.O.B. and not a contract, so that is pretty nice to have some semblance of security.  I’m certainly blessed and grateful to be so lucky in this these times of trial. Watching the Linkedin and Twitter has been really depressing for the last 3 months, as so many of my peers are scrambling to keep working while tech companies fold all around us. Time to focus, improve my skills, and keep swinging for the fences.

SSM Parameter Store

As part of my disaster recovery process, I make a daily AMI snapshot of the servers and copy it to my disaster recovery target region. As that AMI ID changes each day, I need a way to get the current day’s ID into my CloudFormation template so that AMI can be used when creating a copy of our server in the new region. Short of having to use python and Lambda to discover the AMI ID and recreate the template, there had to be a better way to do it.

Enter Parameter Store.

This is an AWS service that acts like a region bound scratchpad, where you can store data and have it retrieved from a few other services, one of which is CloudFormation.

My first step is to create the AMIs and store their IDs in SSM:

# snippet, local_ami_list is list of local instances and names with date pre-pended.
# this section creates the AMIs, tags them, and adds to a list for copying to DR
for line in local_ami_list:
    image_data_combined_list = line.split(',')
    #pprint(image_data_combined_list)
    local_instance_id = image_data_combined_list[0]
    local_instance_name = current_date_tag + '-' + image_data_combined_list[1]
    image = ec2_local.create_image(InstanceId=local_instance_id, Description=local_instance_name, DryRun=False,
                                    Name = local_instance_name, NoReboot=True)
    tag_image = ec2_local.create_tags(Resources=[image['ImageId']], Tags=[{'Key': 'Name', 'Value': local_instance_name},])
    entry = local_instance_name + ',' + image['ImageId']
    ami_list_to_copy.append(entry)

sleep(90)

# this snippet copies the AMIs to the DR region

for line in ami_list_to_copy:
    ami_list_combined_data = line.split(',')
    local_ami_name = ami_list_combined_data[0]
    local_ami_id = ami_list_combined_data[1]
    try:
        image_copy = ec2_dr.copy_image(Description=local_ami_name, Name=local_ami_name, SourceImageId=local_ami_id,
                                        SourceRegion=local_region, DryRun=False)
        entry = local_ami_name + ',' + image_copy['ImageId']
        dr_ami_list.append(entry)
# this snippet is a bit of kludgy hack, but it gets me in the ballpark. Anonymized for my protection

# 5. lists amis in dr region and writes the current day to SSM parameters for further use in cf-scripts

sv1_ami_parameter = '/org/env/ec2/ServerName1/ami'
sv2_ami_parameter = '/org/env/ec2/ServerName2/ami'
sv3_ami_parameter = '/org/env/ec2/ServerName3/ami'
sv4_ami_parameter = '/org/env/ec2/ServerName4/ami'
sv5_ami_parameter = '/org/env/ec2/ServerName5/ami'
current_ami_list = []

ssm_dr = boto3.client('ssm',region_name=dr_region)
dr_amis = ec2_dr.describe_images(Owners=['self'])
for ami in dr_amis['Images']:
    match = re.search(current_date_tag, str(ami['Name']))
    if match:
        entry = str(ami['Name']) + ',' + str(ami['ImageId'])
        current_ami_list.append(entry)

for ami in current_ami_list:
    line = ami.split(',')

    match1 = re.search('ServerName1', line[0])
    match2 = re.search('ServerName2', line[0])
    match3 = re.search('ServerName3', line[0])
    match4 = re.search('ServerName4', line[0])
    match5 = re.search('ServerName5', line[0])
    if match1:
        set_parameter = ssm_dr.put_parameter(Name=sv1_ami_parameter,
                                            Value=line[1],
                                            Type='String',
                                            Overwrite=True)
    elif match2:
        set_parameter = ssm_dr.put_parameter(Name=sv2_ami_parameter,
                                            Value=line[1],
                                            Type='String',
                                            Overwrite=True)
    elif match3:
        set_parameter = ssm_dr.put_parameter(Name=sv3_ami_parameter,
                                            Value=line[1],
                                            Type='String',
                                            Overwrite=True)

    elif match4:
        set_parameter = ssm_dr.put_parameter(Name=sv4_ami_parameter,
                                            Value=line[1],
                                            Type='String',
                                            Overwrite=True)

    elif match5:
        set_parameter = ssm_dr.put_parameter(Name=sv5_ami_parameter,
                                            Value=line[1],
                                            Type='String',
                                            Overwrite=True)

As you can see, the set_parameter function of the boto3 ssm client module puts the data as a plain text value. To retrieve it in a cloudformation script, you have to reference it:

Parameters:

  sv1:
    Description:  'pre-baked AMI copied from ops region, ID retrieved from SSM Parameter Store'
    Type: 'AWS::SSM::Parameter::Value<String>'
    Default: '/org/env/ec2/ServerName1/ami'

# then reference it in the Resources ec2 instance code block as the ImageId.

No more hard coded values, or a need to dynamically generate the script on a daily basis.

You can encrypt values and store them as SecureStrings. However, to retrieve them you will need an understanding of the version number, and I’ve yet to figure that out. Once I do that, then I can more securely store usernames and passwords and avoid hard coding them. So, very cool indeed! (And now I know the answer to an interview question that I bombed!)

First Jenkins success build disaster recovery stacks

Now that I have most of the disaster recovery scripts built (still testing and solving little issues, but getting close!), I thought I’d give Jenkins a try. I’ve built a freestyle job and used the AWS CloudFormation plugin, pointed it at the root stack in S3, and voila! It builds. A couple of things to figure out:

  • I’m not having any luck auto-building from commits. I need to work out a process that will take the steps of pulling the code, and syncing it to my S3 bucket so that Jenkins can build it.
  • As I am using a jenkins container, I don’t have the awscli installed and can’t script the commands. So far, building my own jenkins install with all the tools built seems the way to go, but if there are CloudFormation plugins, there should be something similar to what I need. That, or build it myself (hmm…..).
  • I can’t get an update to work through Jenkins. The build throws an error about a badly formatted template when I try an update, but that update works fine if I do a manual stack update. I I delete the stack and build again, it works fine. Weird….

So, so more troubleshooting ahead. I’m still working on why the load balance instances keep initializing and then get replaced, I think it is a health check setting… Back at it!

python : amiCreateCopyRotate.py

This script creates  an AMI of each running instance, tags them with a datestamp, copies them to a disaster recovery region, and trims images that are 7 days old. This script needs some refactoring, but works at the moment. I think I would like to decouple it a bit so that the sleep timers can be removed.

#!/usr/bin/python

import boto3
import botocore
from datetime import datetime, timedelta
from time import sleep
import re

# variables

local_region = 'us-west-2'
dr_region = 'us-east-1'

current_date = datetime.now().today()
last_week = current_date - timedelta(days=7)
current_date_tag = str(current_date.strftime("%Y-%m-%d"))
last_week_date_tag = str(last_week.strftime("%Y-%m-%d"))

local_ami_list = []
dr_ami_list = []
ami_list_to_copy = []

ec2_local = boto3.client('ec2', region_name=local_region)
ec2_dr = boto3.client('ec2', region_name=dr_region)


# 1. pull list of running instances in us-west-2

try:
    local_instances = ec2_local.describe_instances()
    for key in local_instances['Reservations']:
        for instance in key['Instances']:
            if instance['State']['Name'] == 'running':
                local_instance_id = instance['InstanceId']
                local_instance_tags= instance['Tags'][0]
                local_instance_name = str(local_instance_tags.get('Value'))
                entry = local_instance_id + ',' + local_instance_name
                local_ami_list.append(entry)
            else:
                pass

except botocore.exceptions.ClientError as error:
    print('error: {0}'.format(error))


# 2. Creates an AMI of each instance and tags it with the current date and name

for line in local_ami_list:
    image_data_combined_list = line.split(',')
    #pprint(image_data_combined_list)
    local_instance_id = image_data_combined_list[0]
    local_instance_name = current_date_tag + '-' + image_data_combined_list[1]
    image = ec2_local.create_image(InstanceId=local_instance_id, Description=local_instance_name, DryRun=False,
                                   Name = local_instance_name, NoReboot=True)

    entry = local_instance_name + ',' + image['ImageId']
    ami_list_to_copy.append(entry)

sleep(90)


# 3. Copies the AMIs to the DR region us-east-1

for line in ami_list_to_copy:
    ami_list_combined_data = line.split(',')
    local_ami_name = ami_list_combined_data[0]
    local_ami_id = ami_list_combined_data[1]
    try:
        image_copy = ec2_dr.copy_image(Description=local_ami_name, Name=local_ami_name, SourceImageId=local_ami_id,
                                       SourceRegion=local_region, DryRun=False)
        entry = local_ami_name + ',' + image_copy['ImageId']
        dr_ami_list.append(entry)

    except botocore.exceptions.ClientError as error:
        print('error: {0}'.format(error))

sleep(90)


# 4. Pulls a list of current private AMIs in us-west-2 and drops AMIs that are tagged 7 days older

local_amis_to_prune = ec2_local.describe_images(Owners=['self'])
local_amis = local_amis_to_prune['Images']
for ami in local_amis:
    entry = str(ami['Name']) + ',' + str(ami['ImageId'])
    match = re.search(last_week_date_tag,entry)
    if match:
        ec2_local.deregister_image(ImageId=ami['ImageId'])
        #print('deleting: ', ami['Name'])
    else:
        pass
        #print('not deleting', ami['Name'])

# 5. same for dr region

remote_amis_to_prune = ec2_dr.describe_images(Owners=['self'])
remote_amis = remote_amis_to_prune['Images']
for ami in remote_amis:
    entry = str(ami['Name']) + ',' + str(ami['ImageId'])
    match = re.search(last_week_date_tag,entry)
    if match:
        ec2_dr.deregister_image(ImageId=ami['ImageId'])
        #print('deleting: ', ami['Name'])
    else:
        pass
        #print('not deleting', ami['Name'])

Ansible vs Tf vs Cf: Disaster Recovery

At this point, I am working on my disaster recovery plan by developing templates of my school’s current resources. I have come to understand that CloudFormation is great if you are only working with AWS resources. Terraform works well with AWS but also with other cloud providers, so if you are multi-cloud or opposed to vendor lock in, then Terraform makes a lot of sense. As for Ansible… I can configure servers with it, but deploying cloud resources is not as easy as I would like, and I think that is why everywhere I interview, CF & TF are for Infrastructure as Code, and Ansible is for configuration management. However, I am still working on replicating my resource stacks in all three languages, just for the practice. Today, I am making some serious headway with CloudFormation.

My first task was to get a sense of what is deployed in my organization’s region. This was greatly aided by CloudFormer. I ran the template for the CloudFormer server, then changed it for a larger instance, since it seemed to be grinding on the size of our DNS record stacks. Once I did that, I walked through the interface and was rewarded with about 7K lines of yaml code that declared everything that was built. I ran it two more times, splitting the DNS records (2700 lines) from the main output file (4600 lines). This gave me a good understanding of what all I had built in the console over the last three years. Now I have a baseline from which to build a copy for another region. I am also considering deploying it in the current region and rebuilding all the resources, then deleting the old resources, clearing out the cruft of experimentation.

The next step was to recreate the VPC stack. This began my first use of the !Ref intrinsic function to refer to the parameters I added to the template:

Parameters:

  EnvironmentName:
    Description: An environment name that will be prefixed to resource names
    Type: String
    Default: DR-Prod

  VpcCIDR:
    Description: Please enter the IP range (CIDR notation) for this VPC
    Type: String
    Default: 10.0.0.0/16

  PublicSubnet1CIDR:
    Description: Please enter the IP range (CIDR notation) for the public subnet in the first Availability Zone
    Type: String
    Default: 10.0.0.0/24

## truncated for example

Resources:
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: !Ref VpcCIDR
      EnableDnsSupport: true
      EnableDnsHostnames: true
      Tags:
        - Key: Name
          Value: !Ref EnvironmentName

  InternetGateway:
    Type: AWS::EC2::InternetGateway
    Properties:
      Tags:
        - Key: Name
          Value: !Ref EnvironmentName

  InternetGatewayAttachment:
    Type: AWS::EC2::VPCGatewayAttachment
    Properties:
      InternetGatewayId: !Ref InternetGateway
      VpcId: !Ref VPC

# subnets
  PublicSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      AvailabilityZone: !Select [ 0, !GetAZs '' ]
      CidrBlock: !Ref PublicSubnet1CIDR
      MapPublicIpOnLaunch: true
      Tags:
        - Key: Name
          Value: !Sub ${EnvironmentName} Public Subnet (AZ1)

# routing
  PublicRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: !Sub ${EnvironmentName} Public Routes

  DefaultPublicRoute:
    Type: AWS::EC2::Route
    DependsOn: InternetGatewayAttachment
    Properties:
      RouteTableId: !Ref PublicRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref InternetGateway

  PublicSubnet1RouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      RouteTableId: !Ref PublicRouteTable
      SubnetId: !Ref PublicSubnet1

## truncated for example

 This also allowed me to use the !Select function to get the availability zones without having to name them explicitly, and the !Sub function to add some variety to my tags.

As I began to build other stacks, such as the security stack, I found that I needed to use dynamically generated values like resource IDs, which lead me to the Outputs section, which I had only ever used to generate a URL as part of a tutorial.


Outputs:
  VPC:
    Description: A reference to the created VPC
    Value: !Ref VPC
    Export:
      Name: VP

  PublicSubnet1:
    Description: A reference to the public subnet in the 1st Availability Zone
    Value: !Ref PublicSubnet1
    Export:
      Name: PublicSubnet1

Each named value is then available to other stacks in the region, so you can use the !ImportValue function to retrieve them for your follow on scripts. Here’s part of the security stack, which handles security groups and ingress rules. Later I’ll add a NACLs set, which will be good practice for my Networking Specialty exam coming up.

Description: "This template applies the network security stack, implementing security groups, egress and ingress rules, network access control lists, sets up the bastion host, and applies the NAT and route rules for the private subnets."


Resources:

# security groups and ingress rules

# mdm security groups
  sgMdmProd:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Mdm server in production
      VpcId: !ImportValue VPC
      Tags:
      - Key: Name
        Value: MdmProd

# all access 8443
  ingressMdmProd01:
    Type: AWS::EC2::SecurityGroupIngress
    Properties:
      GroupId: !Ref sgMdmProd
      IpProtocol: tcp
      FromPort: '8443'
      ToPort: '8443'
      CidrIp: 0.0.0.0/0

# ldap 389 only to ldap server
  ingressMdmProd05:
    Type: AWS::EC2::SecurityGroupIngress
    Properties:
      GroupId: !Ref sgMdmProd
      IpProtocol: tcp
      FromPort: '389'
      ToPort: '389'
      SourceSecurityGroupId: !Ref sgOdProd
      SourceSecurityGroupOwnerId: '<redacted>'

# truncated for examples

I exported my security groups so they could be used by other scripts that create resources which require security groups. The best part about having the network stack separate is that I can look at it closely and make sure there are no extraneous rules, and that no traffic is incoming from anywhere I don’t want.

As my list of stacks got larger and larger, I learned about nested stacks and DependsOn conditions. This is my root stack example:

AWSTemplateFormatVersion: '2010-09-09'
Description: "disaster recovery / automatic failover"

Resources:
  DrVpc:
    Type: AWS::CloudFormation::Stack
    Properties:
      TemplateURL: https://s3.amazonaws.com/<redacted>/DrVpc.yaml
      TimeoutInMinutes: '5'
  
  DrSecGroupRules:
    Type: AWS::CloudFormation::Stack
    DependsOn: DrVpc
    Properties:
      TemplateURL: https://s3.amazonaws.com/<redacted>/DrSecurityRules.yaml
      TimeoutInMinutes: '5'

  DrRds:
    Type: AWS::CloudFormation::Stack
    DependsOn: DrSecGroupRules
    Properties:
      TemplateURL: https://s3.amazonaws.com/<redacted>/DrRds.yaml
      TimeoutInMinutes: '20'

Normally, CloudFormation will try to build all the stacks at the same time. If one stack requires the IDs of resources from another stack, the DependsOn directive will cause CloudFormation to wait to create that stack until after the required stack is finished.

The TimeoutInMinutes property value was so that I would not wait an hour for a stack to fail. I had a situation where a resource in the Vpc stack was not getting created, but not failing. I think the default wait time in CloudFormation is 60 minutes before declaring a failure ( I think that was a test question somewhere), so I lowered it to reduce the suspense. As I tested each stack individually, I took note of how long it took to build each stack of resources, and set my timeouts accordingly. The RDS stack took the longest, with each database taking about 5 minutes to spin up, which is important to know 🙂

At present, I have stacks for VPC, security, load balancers, EFS volumes and databases. I’m working on the individual server stacks, trying to figure out how to get the necessary data from the backups to the EFS volumes and the databases. I may have to build some lambda functions to auto copy AMIs over and update the scripts with the AMI IDs. That bears some consideration, and probably some googling 🙂

My disaster recovery drill is scheduled for Friday night, so I have three more days to finish these CloudFormation templates. We’ll see how it goes, and as I learn about helper functions and init scripts I’ll post it here. Once the CF scripts are done, time to do it all over again in Terraform!

Passed the DevOps Pro exam!

I just got back from taking the test, and I am so relieved. That test was much harder than the Solutions Architect Pro exam, and even though I’ve been reading and practicing for a month, I didn’t score as high as I would have liked. Since I haven’t had the need to use Elastic Beanstalk and OpsWorks yet, I had trouble with the highly detailed questions about those services. The test was even more challenging than the practice tests on Whizlabs, so while they were useful for practice, the practice tests were no guarantee.

The testing conditions were much better this time, everything was snappy and no construction noise. There was an accident on the freeway that kept me parked for 30 minutes, which I used to review. The rain was pretty intense, worse when I got out. Definitely grateful to have made it to the testing center in one piece, and to have been able to study and pass this certification. Now, on to Jenkins and Ansible certs! 

Ansible vs Terraform vs CloudFormation

Studying for my AWS professional certifications introduced me to the wonderfully difficult to read world of JSON and CloudFormation templates. Once I discovered you can write them in YAML they became a bit less intimidating, and on the whole pretty amazing. CF templates were my first introduction to Infrastructure as Code, and it has been a lot of fun trying auto reproduce my school’s infrastructure in another region using them. I also had a chance to work with them on my log aggregation project, and wrote a template to install my log transport function, which was pretty cool.

I picked up Terraform as I was applying for a job which had it as a requirement, and I found it easier to pierce than CF templates, mainly because the syntax and variables were easier to read than straight JSON ( I didn’t know about YAML CF templates till later), and I could comment the terraform templates easily, where you can’t comment a JSON file. So, most of my IoC scripts are in Terraform.

I just started working with ansible to solve a server deployment issue at my school where I am deploying a container cluster. I seized on ansible to manage the configurations, and it made deploying the 5 servers a snap.

What I didn’t realize at the time is that ansible can be used for so much more than configuration management. To paraphrase an old Steve Martin joke, it’s like ansible has a module for everything! I had a friend, who uses ansible in production, take a look at my first playbook, and he showed me that instead of treating it like a glorified init script, ansible could create resources and deploy containers as easy as terraform and cloudformation.

Mind blown!

So, my challenge is to create all of my IoC code in each flavor (ansible, terraform and cloudformation) and build automation pipelines for each. As I get them built and tested, I’ll post anonymized versions here. What an adventure!

EFS Provisioned Throughput

I always see recommendations to avoid EFS when running webservers, when it looks like a much better solution than copying documents back and forth to s3. Our public and private web servers have always been particularly low volume, so I never really noticed unacceptable lag. If our pages load in a couple of seconds, not a problem. However, over the last couple of weeks our web server has been performing dog slow. At first I thought is was an apache 2.4 tuning problem, and I was wracking my brain trying different KeepAlive and MinServer directive values, to no avail. I also suspected a plugin problem (when in WordPress, beware the plugin) but it wasn’t until I did a du -f -d 1 to see how much space the plugins were taking up (in case one had blown up) and it took forever to complete the command. Ah ha!

The original EFS mount was set to ‘burst’ mode, so clearly we were exhausting the throughput with just a couple of servers. I set the mount to ‘provisioned’ at 10MiB, and that solved the latency problem. It will cost us about $60 a month for 10MiB (cheap for you all, but that’s real money for us charter school people), but now when I stress test the web server with Jmeter and 400 simultaneous users, the system barely notices, and scales out accordingly. The EFS mount can scale to 1024 MiB, so I imagine that can be pretty beefy for a large scaled service. Now I can go get some sleep!

CodeCommit : example

This is a visualizer snapshot of the main repo my current organization uses to organize their code, showing the recent commits and branch merges.

lambda : python3 : log filter and transport serverless function

This lambda function is deployed via cloud formation template. It is composed of the following parts:

  • A yaml config file that is sourced from an s3 bucket at time of execution. It includes instructions on how to build job rules to extend the function’s capacity.
  • A library of filtering, parsing, and messaging functions
  • A simple lambda_function handler that is triggered by SQS messages posted when a log file is posted to an s3 bucket.
  • cloudformation template

 The function handles 10 messages at a time, filters out files we don’t want, identifies file types we do want, wraps them as syslog messages, then compresses the file and posts it to a bucket, which will get pulled on premise by a local worker script.

The yaml config file

# This document is an abstraction of the pattern match and job function logic of the aws_logs_filter.py lambda function.
#
# The 'job' rules define how the aws-logs-filter.py function will parse logs.
#
# Each job is a log type that is received from AWS accounts that have implemented log aggregation and forwarding
#
# Each log is tested for gzip compression, unzipped if necessary, and loaded into memory. All files are treated as
# plain text files for the purposes of job definition.
#
# The data structure is as follows:
# job##: a simple name for the rule. Must be unique, but not necessarily sequential
# name: a plain language name for the job that is not processed by the function
# match_variable: a pattern that the function will use to identify the log
# service_variable: a service that will be added to the syslog message to determine the log type.
# action variables: an ordered list of steps the function will take to process the log.
#
# The callable methods work as follows:
#   json: treats the file as a json object on each line
#   firehose: json objects on a single line. The method looks for touching braces "}{" and breaks them into lines "}\n{"
#   text_wrap: handles text logs regardless of form, wraps them as a syslog message
#
# all logs that meet the job rules get parsed, gzipped and sent to the filtered s3 bucket.
# all logs that fail to get parsed are written to the region's DynamoDB table, AWSLogsFilterExceptions
#
# Periodic review of the table will show logs that are not being processed, and a new set of job rules can be
# added to this file. The configuration will be updated on the next execution of the lambda function.
#
# Periodic review of the dead letter queue will also be useful to identify logs that are not being processed, but these
# be a rare exception. We will need to ascertain why the log was not written to the database table.
#
# This file is kept in the s4://<redacted> bucket, which supports versioning. If a change is made that
# breaks the lambda functions, you can restore the previous version using the AWS S3 management console.
#
jobs:
    job01 :
        name: cloudtrail
        match_variable: cloudtrail/
        service_variable: 'cloudtrail.amazonaws.com'
        action_variables:
           - json

    job02 :
        name: rds
        match_variable: rds/
        service_variable: 'rds.amazonaws.com'
        action_variables:
           - text_wrap

    job03 :
        name: firehose
        match_variable: firehose/
        service_variable: 'cloudwatch.amazonaws.com'
        action_variables:
           - firehose
           - json

    job04 :
        name: config
        match_variable: config/
        service_variable: 'config.amazonaws.com'
        action_variables:
           - json

    job05 :
        name: flowlog
        match_variable: flowlog/
        service_variable: 'vpc.amazonaws.com'
        action_variables:
           - text_wrap

    job06 :
        name: elb
        match_variable: elasticloadbalancing/
        service_variable: 'elb.amazonaws.com'
        action_variables:
           - text_wrap

    job07 :
        name: serveraccesslogs
        match_variable: serveraccesslogs/
        service_variable: 'ec2.amazonaws.com'
        action_variables:
           - text_wrap

    job08 :
        name: mq
        match_variable: mq/
        service_variable: 'mq.amazonaws.com'
        action_variables:
           - text_wrap

    job09 :
        name: redshift
        match_variable: redshift/
        service_variable: 'redshift.amazonaws.com'
        action_variables:
           - text_wrap

    job10 :
        name: cloudfront
        match_variable: cloudfront/
        service_variable: 'cloudfront.amazonaws.com'
        action_variables:
           - text_wrap

    job11 :
        name: ec2
        match_variable: ec2/
        service_variable: 'ec2.amazonaws.com'
        action_variables:
           - json

    job12 :
        name: emr
        match_variable: elasticmapreduce/
        service_variable: 'elasticmapreduce.amazonaws.com'
        action_variables:
           - text_wrap

    job13 :
        name: elb2
        match_variable: elb/
        service_variable: 'elb.amazonaws.com'
        action_variables:
           - text_wrap

    job14 :
        name: etl
        match_variable: etl/
        service_variable: 'glue.amazonaws.com'
        action_variables:
           - text_wrap

    job15 :
        name: s3
        match_variable: s3/
        service_variable: 's3.amazonaws.com'
        action_variables:
           - text_wrap

    job16:
        name: aurora
        match_variable: aurora/
        service_variable: 'rds.amazonaws.com'
        action_variables:
        - text_wrap

# filter variable. These are key path patterns that we drop to avoid redundant logs and files that lock up the parser
FILTERS:
    - CloudTrail-Digest
    - ConfigWritabilityCheckFile
    - DS_Store
    - .trm
    - .trc
    - <example key path>
    - <rest removed for privacy>

# aggregate_accounts variable for those that combine their logs in one account before replication. Note in key path where
# actual account number shows, split by '/'
aggregate_accounts:
    agg_acct01:
        name: <redacted1>
        agg_account: '<example1>'
        match_position: '1'
    agg_acct02:
        name: <redacted2>
        agg_account: '<redacted2>'
        match_position: '2'
    agg_acct03:
        name: <redacted3>
        agg_account: '<redacted3>'
        match_position: '2'

The Library File

#!/usr/local/bin/python3
"""
DESCRIPTION
Lambda function to process sqs events, filter messages based on key paths,
load filtered messages into memory,parse formats to syslog,
and write syslog messages to a gzipped s3 object, and drop the event generating sqs message.
"""

import re
import ast
import gzip
import json
from datetime import datetime
import urllib
import boto3
import botocore
import os
import yaml
import math
import uuid


REGION = os.environ.get('AWS_REGION')


# setup vars
config_bucket = '<redacted>'
yaml_config = 'config.yaml'
s3 = boto3.client('s3')
config_handle = s3.get_object(Bucket=config_bucket, Key=yaml_config)
config_data = body = config_handle.get('Body').read()
yaml_data = yaml.load(config_data)
FILTERS = yaml_data['FILTERS']
aggregate_accounts = yaml_data['aggregate_accounts']


class Message:
    """ Pull objects and variables plus event data. """

    def __init__(self, sqs_object, record, in_queue, out_queue):
        """ receives a single record from event. """
        self.sqs = sqs_object
        self.message_id = record["messageId"]
        self.receipt_handle = record["receiptHandle"]
        # not in testing
        body = ast.literal_eval(record["body"])
        self.msg_key_path = body["Records"][0]["s3"]["object"]["key"]
        # self.msg_key_path = record["body"]["Records"]["s3"]["object"]["key"]
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.tag = 'message: initialized'
        #print('key: ', self.msg_key_path)

    def filter_message(self):
        """
        filters keypath against list to drop files we don't want
        """
        return_value = ''
        global FILTERS
        for my_filter in FILTERS:
            if re.search(my_filter, self.msg_key_path):
                #print('matched: ', self.msg_key_path)
                self.drop_message()
                self.tag = 'message: filtered'
                return_value = True
        if return_value:
            return True
        else:
            #print('file not matched: ', self.msg_key_path)
            return False

    def drop_message(self):
        """
        removes sqs msg from queue after successful file processing
        """
        try:
            self.sqs.delete_message(
                QueueUrl=self.in_queue,
                ReceiptHandle=self.receipt_handle
            )
            return True

        except botocore.exceptions.ClientError as error:
            exception_string = 'Raised exception: error: {0}'.format(error) + '\n'
            print('can not drop message:', exception_string)
            return False

    def write_sqs_message_to_out_queue(self, sqs_object, out_queue, data):
        """
        Writes a message to specified / dlq queue. Have to replace single quotes to meet json formatting
        """

        try:
            replace_single_quotes = re.sub('\'', '\"', data)
            sqs = sqs_object
            sqs.send_message(
                QueueUrl=out_queue,
                MessageBody=replace_single_quotes,
            )
            return True

        except botocore.exceptions.ClientError as error:
            exception_string = 'Raised exception: error: {0}'.format(error) + '\n'
            print('can not write message to out_queue:', exception_string)
            return False


class File:

    """ Processes s3 object from msg_key_path. """
    def __init__(self, s3_object, msg_key_path, in_bucket, out_bucket):
        self.s3 = s3_object
        self.msg_key_path = msg_key_path
        self.in_bucket = in_bucket
        self.out_bucket = out_bucket
        self.service_name = ''
        self.owner_id = ''
        self.tag = 'file: initialized'

# check these
    def parse_body_json_to_syslog(self):
        output_data = ''
        accountID = 'account-' + self.owner_id
        my_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S%Z')
        service_name = self.service_name
        my_data = str(self.body).split('\n')
        self.datetime = my_time

        for data in my_data:
            my_data = json.loads(data)
            message = my_time + ' ' + str(accountID) + ' ' + str(service_name) + ' ' + str(my_data) + '\n'
            output_data += message
        self.body = output_data
        return True

    def determine_if_agg_account(self):
        for key in aggregate_accounts.keys():
            match = re.search(self.msg_key_path, aggregate_accounts[key]['agg_account'])
            split_path = self.msg_key_path.split('/')

            if match:
                self.owner_id = split_path[int(aggregate_accounts[key]['match_position'])]
            else:
                self.owner_id = split_path[0]
        return self.owner_id

    def determine_service_name(self, service_name):
        self.service_name = service_name
        return True

    def fix_body_firehose(self):
        data = re.sub('}{', '}\n{', self.body)
        self.tag = 'body: each line json'
        self.body = data
        return True

    def parse_ascii_body_to_syslog(self):
        output_data = ''
        accountID = 'account-' + self.owner_id
        my_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S%Z')
        service_name = self.service_name
        self.datetime = my_time
        output_data = my_time + ' ' + str(accountID) + ' ' + str(service_name) + ' ' + str(self.body)
        self.body = output_data
        return True

# these are reliable
    def load_s3_object_to_body(self):
        """
       Loads the file's contents to memory for manipulation
        """
        self.msg_key_path = urllib.parse.unquote(self.msg_key_path)
        file_handle = self.s3.get_object(Bucket=self.in_bucket, Key=self.msg_key_path)
        try:
            self.body = file_handle.get('Body').read()
            self.tag = 'body: loaded'
            return True
        except botocore.exceptions.ClientError as error:
            exception_string = 'Raised exception: error: {0}'.format(error) + '\n'
            print('can not load file from path: ', self.msg_key_path, ' ', exception_string)
            return False

    def put_s3_object(self):
        """ Write loaded data to filtered bucket. """
        try:
            self.s3.put_object(Bucket=self.out_bucket, Key=self.msg_key_path, Body=self.body)
            return True
        except:
            print('could not put: ', self.msg_key_path)
            return False

    def check_body_is_gzip(self):
        try:
            test_gzip = gzip.decompress(self.body)
            self.tag = 'body: gzip'
            return True
        except:
            return False

    def check_body_is_ascii(self):
        try:
            data = self.body.decode()
            test_ascii = isinstance(data, data)
            if test_ascii:
                self.tag = 'body: ascii'
                return True
        except:
            return False

    def gunzip_body(self):
        data = gzip.decompress(self.body)
        self.body = data.decode('utf-8')
        self.tag = 'body: unzipped'
        return True

    def gzip_body(self):
        data = self.body
        encoded_data = data.encode('utf-8')
        self.body = gzip.compress(encoded_data)
        self.tag = 'body: compressed to gz'
        return True

    def add_syslog_suffix_to_path(self):
        self.msg_key_path = self.msg_key_path + '.syslog'
        self.tag = 'file: .syslog'
        return True

    def add_gz_suffix_to_path(self):
        self.tag = 'file: .gz'
        self.msg_key_path = self.msg_key_path + '.gz'
        return True

    def strip_path_of_suffix(self):
        try:
            self.msg_key_path = re.sub('.gz', '', self.msg_key_path)
            self.msg_key_path = re.sub('.json', '', self.msg_key_path)
            self.msg_key_path = re.sub('.syslog', '', self.msg_key_path)
            return True
        except:
            return False

    def buffer_msg_size(self):
        content_length = len(self.body)
        content = self.body.split('\n')
        owner_id = self.owner_id
        service_name = self.service_name
        my_timestamp = self.datetime
        main_body = ''

        for line in content:
            line_length = len(line)
            if line_length < 8000:
                main_body += line + "\n\n"
            else:
                my_uuid = self.my_random_sequence()
                part_count = math.ceil(line_length / 8000)
                print('part_count: ', part_count)
                for x in range(0, part_count):
                    if x == part_count:
                        submessage = line[(x * 8000):0]
                        submessage = str(my_timestamp) + " account-" + str(owner_id) + " " + service_name + " " + submessage + " sequence " + str(my_uuid) + " part " + str(x + 1) + " of " + str(part_count) + "\n\n"
                        main_body += submessage
                    else:
                        submessage = line[(x * 8000):((x + 1) * 8000)]
                        submessage = str(my_timestamp) + " account-" + str(owner_id) + " " + service_name + " " + submessage + " sequence " + str(my_uuid) + " part " + str(x + 1) + " of " + str(part_count) + "\n\n"
                        main_body += submessage
        self.body = main_body
        try:
            self.transport_syslog_body_to_gz_s3()
        except botocore.exceptions.ClientError as error:
            exception_string = 'Raised exception: error: {0}'.format(error) + '\n'
            print('can not write message to out_queue:', exception_string)

    def parse_exception(self):
        # can let message process 4 times and fail to dlq, or copy directly to dlq, &/or write to ddb table
        line_split = self.msg_key_path.split("/")
        accountID = line_split[0]
        now = datetime.now().today().strftime("%Y-%m-%dT%H:%M:%S:%f")
        ddb = boto3.client('dynamodb')
        table_name = 'AwsLogsFilterExceptions'
        if self.check_body_is_gzip() == True:
            self.gunzip_body()
        # limiting body message to < 400kb, max limit of ddb entry
        self.body = str(self.body[0:3000])
        item = {
            'accountID': {'S': accountID},
            'eventDateTime': {'S': now},
            'region': {'S': REGION},
            'msg_key_path': {'S': self.msg_key_path},
            'tag': {'S': self.tag},
            'body_data': {'S': self.body},
        }

        try:
            ddb.put_item(
                TableName=table_name,
                Item=item
            )
            return True
        except botocore.exceptions.ClientError as error:
            print('can not write to ddb' + 'error: {0}'.format(error))
            return False

    def my_random_sequence(self,string_length=5):
        sequence = str(uuid.uuid4()).upper()
        return sequence[0:string_length]

    def transport_syslog_body_to_gz_s3(self):
        self.add_syslog_suffix_to_path()
        self.gzip_body()
        self.add_gz_suffix_to_path()
        self.put_s3_object()
        return True

The Lambda Handler

def lambda_handler(event, context):
    """
    Flow control for aws-logs-filter.
    """
    import aws_logs_filter
    import boto3
    import botocore
    import re
    import os
    import yaml


    # setup vars
    config_bucket = '<redacted>'
    yaml_config = 'config.yaml'
    s3 = boto3.client('s3')
    config_handle = s3.get_object(Bucket=config_bucket, Key=yaml_config)
    config_data = config_handle.get('Body').read()
    yaml_data = yaml.load(config_data)

    REGION = os.environ.get('AWS_REGION')
    FILTERS = yaml_data['FILTERS'] # TODO see if this use of FILTERS is necessary, called as function in aws_logs_filter
    jobs = yaml_data['jobs']
    aggregate_accounts = yaml_data['aggregate_accounts']

    in_bucket = '<redacted>' + REGION
    out_bucket = '<redacted>-filtered-' + REGION

    sqs = boto3.client('sqs')
    in_queue = 'https://sqs.' + REGION + '.amazonaws.com/<redacted>/Log-Aggregation-Queue'
    out_queue = 'https://sqs.' + REGION + '.amazonaws.com/<redacted>/Log-Aggregation-Dead-Letter-Queue'

    ddb = boto3.client('dynamodb')
    table_name = 'AwsLogsFilterExceptions'

    sent_flag = 'no'

    for record in event['Records']:

        try:
            message = aws_logs_filter.Message(sqs, record, in_queue, out_queue)
            # print(message.tag)
        except botocore.exceptions.ClientError as error:
            exception_string = 'Raised exception: error: {0}'.format(error) + '\n'
            print('can not retrieve message:', exception_string)

        try:
            filtered_result = message.filter_message()
            # print(message.tag)
        except botocore.exceptions.ClientError as error:
            exception_string = 'Raised exception: error: {0}'.format(error) + '\n'
            print('can not filter:', exception_string)

        # simple filter function, mostly relevant to us-west-2, but some global entries
        if filtered_result is True:
            print('dropped message: ', message.msg_key_path)
            message.drop_message()

        # main processing flow control
        if filtered_result is False:
            print('loading message:', message.msg_key_path)
            try:
                file = aws_logs_filter.File(s3, message.msg_key_path, in_bucket, out_bucket)
                file.load_s3_object_to_body()
            except:
                print('could not load file into memory')
                break
                # could result in dlq messages from unprocessed from break.
                # TODO fix so this fails gracefully

            try:
                if file.check_body_is_gzip() == True:
                    file.gunzip_body()
                    file.strip_path_of_suffix()
                elif file.check_body_is_gzip() == False:
                    file.check_body_is_ascii()

            except:
                print('file is unparsable: ', file.msg_key_path)
                file.parse_exception()

            for key in jobs.keys():
                file.determine_if_agg_account()
                #print('account id: ',file.owner_id)
                match_variable = jobs[key]['match_variable']
                action_variables = jobs[key]['action_variables']
                service_variable = jobs[key]['service_variable']
                # TODO make sure ownerID variables are in place


                match = re.search(match_variable, message.msg_key_path, re.IGNORECASE)

                if match:
                    file.determine_service_name(service_variable)
                    print("service: ", file.service_name)
                    action_count = len(action_variables)
                    #print('action_count: ', action_count)
                    for i in range(0, action_count):
                        #print(action_variables[i])

                        if action_variables[i] == 'firehose':
                            file.strip_path_of_suffix()
                            file.fix_body_firehose()

                        elif action_variables[i] == 'json':
                            file.strip_path_of_suffix()
                            file.parse_body_json_to_syslog()

                        elif action_variables[i] == 'text_wrap':
                            file.parse_ascii_body_to_syslog()
                        #print('end of action #', i)
                    file.buffer_msg_size()
                    sent_flag = "yes"

            if sent_flag == "yes":
                print('sent: ', sent_flag)
            else:
                print('write to ddb as error')
                result = file.parse_exception()
                if result is True:
                    message.drop_message()
                # if false, no db entry, message goes to dead letter for further processing

Cloudformation template

AWSTemplateFormatVersion: 2010-09-09
Description: >-
    Creates an instance of aws_logs_filter.py for filtering, wrapping and transporting
    log files so they can be pulled on-premise for further processing.

Parameters:
    LambdaCodeBucket:
        Description: The bucket where you uploaded the zip file
        Type: String
    LambdaCodeFile:
        Description: the name of the zip file you uploaded
        Type: String
    FunctionInstanceName:
        Description: Name of this instance of the function (must be unique).
        Type: String
Resources:
    AWSLogsFilter:
        Type: AWS::Lambda::Function
        Properties:
            Code:
                S3Bucket:
                    Ref: LambdaCodeBucket
                S3Key:
                    Ref: LambdaCodeFile
            FunctionName:
                Ref: FunctionInstanceName
            Handler: lambda_function.py
            MemorySize: 512
            Role: arn:aws:iam::<redacted>:role/lambda_filter_log_agg_sqs_queue
            Timeout: 900
            Description: >-
                pulls log file, wraps as syslog, segments and sequences long messages
                and puts compressed file to filter bucket
            Runtime: python3.6
    LambdaFunctionEventSourceMapping:
        Type: AWS::Lambda::EventSourceMapping
        Properties:
            BatchSize: 10
            Enabled: true
            #change above to true for production
            EventSourceArn:
                Fn::Join:
                    - ''
                    - 
                      - 'arn:aws:sqs:'
                      - Ref: 'AWS::Region'
                      - ':<redacted>:Log-Aggregation-Queue'
            FunctionName: 
                Ref: AWSLogsFilter