Streaming Large Objects from S3 with Golang

AWS S3 is an industry-leading object storage service. It is typically used to store images, videos, logs, and files. It can keep large objects up to 5TB in size. You can download any S3 object through a URL and do whatever processing is needed.

Downloading and processing files in memory is possible until a specific file size but impossible when you need to work with a file bigger than 1GB if you do not have infinite resources. And since most of the stuff is running inside managed containers, with constrained resources, we need a way of streaming the file from S3.

A few months back, I had to process a 30GB file stored in S3 with Go. It was a JSON Lines file, where every line is a valid JSON. Every object would be stored in a Redis DB after some basic processing.

Download Objects from S3

Using the AWS SDK for Go to get an object is straightforward.

func main() {
	handleErr := func(e error) {
		if e != nil {
			Log.Std.Fatal(e)
		}
	}
	//Initializing s3 session.
	sess, err := session.NewSession(&aws.Config{
		Region: aws.String("an-aws-region"),
	})
	handleErr(err)

	//Obtain s3 client
	s3Client := s3.New(sess)
	object, err := s3Client.GetObject(&s3.GetObjectInput{
		Bucket: aws.String("bucket-name"), 
		Key:    aws.String("object-key"),
	} )
	handleErr(err)
	
	body, err := io.ReadAll(object.Body)
	handleErr(err)
	for _, line := range strings.Split(string(body), "\n") {
		if line == "" {
			continue
		}
		log.Println("JSONL: ", line)
	}
}

This implementation has some particular problems. The HTTP connection will probably close before the pulling of the object completes, given the size of the file. But even if we manage to pull the complete object, we will not have enough memory to process it. The Amazon S3 Downloader - determines if a file can be split into smaller parts and downloaded in parallel. It could potentially solve the problem if there were enough disk space.

Total size of object

Before any object gets, we need to find out the actual size of the object. Since we will request it in chunks, we need a way to know when we are finished. The AWS SDK for Go provides the HeadObject function, which retrieves metadata from an object without returning the file itself.

	objectMetadata, err := s3Client.HeadObject(&s3.HeadObjectInput{
	    Bucket: aws.String("bucket-name"), 
	    Key:    aws.String("object-key"),
	})

	objectSize := int(*objectMetadata.ContentLength)

Get a chunk of the object

The AWS SDK provides a ranged HTTP Get request., which should respect the RFC 2616. So for the first 1000 bytes of the file, the get request becomes:

	object, err := s3Client.GetObject(&s3.GetObjectInput{
		Bucket: aws.String("bucket-name"),
		Key:    aws.String("object-key"),
		Range:  aws.String(fmt.Sprintf("bytes=%d-%d", 0, 999)),
	})

And all together

Up to this point, we know how big the file is and how to get it in chunks; what is left is to gather it all together. Since the file in my use case had one JSON per line, I had to do some extra logistics to process only valid lines and adjust the range accordingly so that I would start from a good line in the subsequent request. This extra step and the fact that I would store every line in a Redis resulted in not implementing any parallelization. If any error occurs, I prefer my processing to stop and not end up with rubbish or incomplete data.

func main() {
	handleErr := func(e error) {
		if e != nil {
			Log.Std.Fatal(e)
		}
	}
	// Create the s3 session and obtain the s3 client for querying the bucket.
	sess, err := session.NewSession(&aws.Config{
		Region: aws.String("an-aws-region"),
	})
	handleErr(err)

	s3Client := s3.New(sess)
	objectMetadata, err := s3Client.HeadObject(&s3.HeadObjectInput{
		Bucket: aws.String("bucket-name"),
		Key:    aws.String("object-key"),
	})

	objectSize := int(*objectMetadata.ContentLength)

	// pos is the position of the first byte of the request range
	//off is the position of the last byte of the request range.
	//delta is the number of the bytes that are not a complete line and are calculated in order to adjust the pos.
	pos := 0
	off := 0
	delta := 0

	for off <= objectSize {
		//new pos is the off pos  adjusted by the delta
		pos = off - delta
		//new off is increased by the defined step. Check if we reached the end of the file
		if off = off + (16 * 1024 * 1024); off > objectSize {
			off = objectSize
		}
		//if the start and end of file position are equal we are done
		if pos == off {
			break
		}
		// Edge case: the request chunk of bytes end with a full json line. This means
		// that delta is 0 and the next request should start from the last byte of the previous request
		// increased by one.
		if delta == 0 && pos != 0 {
			pos = pos + 1
		}

		Log.Debug.Printf("position: %d, offset: %d, delta: %d\n", pos, off, delta)

		inputObject := getS3InputObject(aws.String(fmt.Sprintf("bytes=%d-%d", pos, off)))
		objectOutputChunk, err := s3Client.GetObject(inputObject)
		handleErr(err)

		objectBodyRaw, err := io.ReadAll(objectOutputChunk.Body)
		for _, line := range strings.Split(string(objectBodyRaw), "\n") {
			if line == "" {
				continue
			}
			if !strings.HasSuffix(line, "}") {
				delta = len(line)
				break
			} else {
				delta = 0
			}
			log.Println("JSONL: ", line)
			// unmarshal to a struct for DB
		}
	}
}

func getS3InputObject(requestRange *string) *s3.GetObjectInput {
	return &s3.GetObjectInput{
		Bucket: aws.String("bucket-name"),
		Key:    aws.String("object-key"),
		Range:  requestRange,
	}
}

There are various improvements that can happen, but this is a good start if you want to read/parse large files from S3.