Friday, December 16, 2022
HomeData ScienceDynamoDB Go SDK: The best way to Use the Scan and Batch...

DynamoDB Go SDK: The best way to Use the Scan and Batch Operations Effectively | by Abhishek Gupta | Dec, 2022


Parallel scan with the DynamoDB Go SDK (picture by creator)

Study with sensible code examples

The DynamoDB Scan API accesses each gadgets in a desk (or secondary index). It’s the equal of a choose * from question. One of many issues I’ll cowl on this weblog is the right way to use Scan API with the DynamoDB Go SDK.

To scan a desk, we’d like some knowledge to start with! So within the course of, I can even go into the right way to use the Batch API to jot down bulk knowledge in DynamoDB. You should use the BatchWriteItem API to create or delete gadgets in batches (of twenty 5) and it is doable to you’ll be able to mix these operations throughout a number of tables.

We are going to begin easy and steadily enhance our method to make use of the APIs effectively. I can even go over a number of the primary checks that I ran to exhibit incremental enhancements. Lastly I’ll wrap up by highlighting a number of the issues whereas utilizing these operations.

You’ll be able to seek advice from the code on GitHub

… ensure that to create a DynamoDB desk referred to as customers with:

  • partition key e mail (knowledge sort String) and
  • On-Demand capability mode.
DynamoDB desk (picture by creator)

Additionally, there are some things I wish to name a number of issues to set the context:

  • The desk was created in us-east-1 and checks had been executed from an EC2 occasion in us-east-1 as properly
  • Since these are common checks as an alternative of specialized benchmarks, I didn’t do any particular tuning (at any stage). These are simply Go capabilities that had been executed with completely different inputs, retaining issues so simple as doable.
  • The checks embody marshalling (changing Go struct to DynamoDB knowledge sorts) for BatchWriteItem operations and un-marshalling (changing from DynamoDB knowledge sorts again to Go struct) for Scan operation.

Lets begin off by exploring the BatchWriteItem API. This fashion we could have knowledge to work with the Scan operations as properly.

Win-win!

Since you’ll be able to mix 25 gadgets in a single invocation, utilizing a batch method for bulk knowledge imports is significantly better in comparison with invoking the PutItem in a loop (and even in parallel).

Here’s a primary instance of how you’ll use BatchWriteItem:

func basicBatchImport() {

startTime := time.Now()

cities := []string{"NJ", "NY", "ohio"}
batch := make(map[string][]sorts.WriteRequest)
var requests []sorts.WriteRequest

for i := 1; i <= 25; i++ {
person := Consumer{Electronic mail: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, Metropolis: cities[rand.Intn(len(cities))]}
merchandise, _ := attributevalue.MarshalMap(person)
requests = append(requests, sorts.WriteRequest{PutRequest: &sorts.PutRequest{Merchandise: merchandise}})
}

batch[table] = requests

op, err := consumer.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})
if err != nil {
log.Deadly("batch write error", err)
} else {
log.Println("batch insert carried out")
}

if len(op.UnprocessedItems) != 0 {
log.Println("there have been", len(op.UnprocessedItems), "unprocessed information")
}

log.Println("inserted", (25 - len(op.UnprocessedItems)), "information in", time.Since(startTime).Seconds(), "seconds")
}

With BatchWriteItemInput, we are able to outline the operations we wish to carry out within the batch — right here we’re simply going to carry out PutRequests (which is encapsulated inside one other sort referred to as WriteRequest).

We assemble the WriteRequests in a slice and eventually put them in a map with key being the desk identify – that is precisely what the RequestItems attribute in BatchWriteItemInput wants.

On this case we’re coping with a single desk however you may execute operations on a number of tables.

On this instance we simply handled one batch of 25 information (most permitted batch dimension). If we wish to import extra information, all we have to do is cut up them into batches of 25 and execute them one (sub)batch at a time. Easy sufficient — right here is an instance:

func basicBatchImport2(whole int) {

startTime := time.Now()

cities := []string{"NJ", "NY", "ohio"}
batchSize := 25
processed := whole

for num := 1; num <= whole; num = num + batchSize {

batch := make(map[string][]sorts.WriteRequest)
var requests []sorts.WriteRequest

begin := num
finish := num + 24

for i := begin; i <= finish; i++ {
person := Consumer{Electronic mail: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, Metropolis: cities[rand.Intn(len(cities))]}
merchandise, _ := attributevalue.MarshalMap(person)
requests = append(requests, sorts.WriteRequest{PutRequest: &sorts.PutRequest{Merchandise: merchandise}})
}

batch[table] = requests

op, err := consumer.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

if err != nil {
log.Deadly("batch write error", err)
}

if len(op.UnprocessedItems) != 0 {
processed = processed - len(op.UnprocessedItems)
}
}

log.Println("all batches completed. inserted", processed, "information in", time.Since(startTime).Seconds(), "seconds")

if processed != whole {
log.Println("there have been", (whole - processed), "unprocessed information")
}
}

I attempted this with 50000 information (which implies 2000 batches) and it took roughly 15 seconds. However we are able to do significantly better!

Parallel batch import

As an alternative of processing every batch sequentially, we are able to spin up a goroutine for every batch:

func parallelBatchImport(numRecords int) {

startTime := time.Now()

cities := []string{"NJ", "NY", "ohio"}
batchSize := 25

var wg sync.WaitGroup

processed := numRecords

for num := 1; num <= numRecords; num = num + batchSize {
begin := num
finish := num + 24

wg.Add(1)

go func(s, e int) {
defer wg.Executed()

batch := make(map[string][]sorts.WriteRequest)
var requests []sorts.WriteRequest

for i := s; i <= e; i++ {
person := Consumer{Electronic mail: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, Metropolis: cities[rand.Intn(len(cities))]}

merchandise, err := attributevalue.MarshalMap(person)
if err != nil {
log.Deadly("marshal map failed", err)
}
requests = append(requests, sorts.WriteRequest{PutRequest: &sorts.PutRequest{Merchandise: merchandise}})
}

batch[table] = requests

op, err := consumer.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

if err != nil {
log.Deadly("batch write error", err)
}

if len(op.UnprocessedItems) != 0 {
processed = processed - len(op.UnprocessedItems)
}

}(begin, finish)
}

log.Println("ready for all batches to complete....")
wg.Wait()

log.Println("all batches completed. inserted", processed, "information in", time.Since(startTime).Seconds(), "seconds")

if processed != numRecords {
log.Println("there have been", (numRecords - processed), "unprocessed information")
}
}

The outcomes improved by a very good margin. Here’s what I bought. On a median:

  • Inserting 50000 information took ~ 2.5 seconds
  • inserted 100000 information in ~ 4.5 to 5 seconds
  • inserted 150000 information in lower than 9.5 seconds
  • inserted 200000 information in lower than 11.5 seconds

There possibly unprocessed information in a batch. This instance detects these information, however the retry logic has been skipped to maintain issues easy. Ideally it’s best to have a (exponential back-off primarily based) retry mechanism for dealing with unprocessed information as properly.

To insert extra knowledge, I ran the parallelBatchImport operate (above) in loops. For instance:

for i := 1; i <= 100; i++ {
parallelBatchImport(50000)
}

Alright, let’s transfer forward. Now that we have now some knowledge, let’s attempt …

That is what primary utilization appears like:

func scan() {
startTime := time.Now()

op, err := consumer.Scan(context.Background(), &dynamodb.ScanInput{
TableName: aws.String(desk),
ReturnConsumedCapacity: sorts.ReturnConsumedCapacityTotal,
})

if err != nil {
log.Deadly("scan failed", err)
}

for _, i := vary op.Gadgets {
var u Consumer
err := attributevalue.UnmarshalMap(i, &u)
if err != nil {
log.Deadly("unmarshal failed", err)
}
}

if op.LastEvaluatedKey != nil {
log.Println("all gadgets haven't been scanned")
}
log.Println("scanned", op.ScannedCount, "gadgets in", time.Since(startTime).Seconds(), "seconds")
log.Println("consumed capability", *op.ConsumedCapacity.CapacityUnits)
}

Simply present the desk (or secondary index) identify and you’re good to go! However, there are probabilities that you simply may not be capable of get all gadgets due to API limits (1 MB price of information per invocation). In my case took about 0.5 secs for about 15000 information — remainder of the gadgets had been skipped as a result of the 1 MB restrict was breached.

Utilizing Pagination

To deal with the limitation round knowledge, the Scan API returns LastEvaluatedKey in its output to level to the final processed file. All you have to do is invoke Scan once more, with the worth for ExclusiveStartKey attribute set to the one for LastEvaluatedKey.

Utilizing paginated scan method took me roughly 100 secs to scan ~ 7.5 million information.

Parallel Scan

Pagination helps, nevertheless it’s nonetheless a sequential course of. There’s lot of scope for enchancment. Fortunately, Scan lets you undertake a parallelized method i.e. you need to use a number of staff (goroutines on this case) to course of knowledge in parallel!

func parallelScan(pageSize, totalWorkers int) {
log.Println("parallel scan with web page dimension", pageSize, "and", totalWorkers, "goroutines")
startTime := time.Now()

var whole int

var wg sync.WaitGroup
wg.Add(totalWorkers)

for i := 0; i < totalWorkers; i++ {
// begin a goroutine for every section

go func(segId int) {
var segTotal int

defer wg.Executed()

lastEvaluatedKey := make(map[string]sorts.AttributeValue)

scip := &dynamodb.ScanInput{
TableName: aws.String(desk),
Restrict: aws.Int32(int32(pageSize)),
Section: aws.Int32(int32(segId)),
TotalSegments: aws.Int32(int32(totalWorkers)),
}

for {
if len(lastEvaluatedKey) != 0 {
scip.ExclusiveStartKey = lastEvaluatedKey
}
op, err := consumer.Scan(context.Background(), scip)

if err != nil {
log.Deadly("scan failed", err)
}

segTotal = segTotal + int(op.Depend)

for _, i := vary op.Gadgets {

var u Consumer
err := attributevalue.UnmarshalMap(i, &u)
if err != nil {
log.Deadly("unmarshal failed", err)
}
}

if len(op.LastEvaluatedKey) == 0 {
log.Println("[ segment", segId, "] completed")
whole = whole + segTotal
log.Println("whole information processsed by section", segId, "=", segTotal)
return
}

lastEvaluatedKey = op.LastEvaluatedKey
}
}(i)
}

log.Println("ready...")
wg.Wait()

log.Println("carried out...")
log.Println("scanned", whole, "gadgets in", time.Since(startTime).Seconds(), "seconds")
}

Section and TotalSegments attributes are the important thing to how Scan API permits parallelism. TotalSegments is nothing however the variety of threads/goroutines/worker-processes that should be spawned and Section is a singular identifier for every of them.

In my checks, the Scan efficiency remained (nearly) fixed at 37-40 seconds (common) for about ~ 7.5 million information (I attempted a wide range of web page dimension and goroutine mixtures).

What number of TotalSegments do I must configure???

To tune acceptable variety of parallel threads/staff, you may must experiment a bit. Lots may rely in your consumer setting.

  • Do you will have sufficient compute sources?
  • Some environments/runtimes might need managed thread-pools, so you’ll have to adjust to these

So, you have to to attempt issues out to seek out the optimum parallelism on your. a technique to consider it may very well be to decide on one section (single employee/thread/goroutine) per unit of information (say a section for each GB of information you wish to scan).

Each Batch and Scan APIs are fairly highly effective, however there are nuances you have to be conscious of. My advise is to learn up the API documentation totally.

With Batch APIs:

  • Not more than 25 requests in a batch
  • Particular person merchandise in a batch ought to not exceeds 400KB
  • Whole dimension of things in a single BatchWriteItem can’t be greater than 16MB
  • BatchWriteItem can not replace gadgets
  • You can not specify circumstances on particular person put and delete requests
  • It does not return deleted gadgets within the response
  • If there are failed operations, you’ll be able to entry them by way of the UnprocessedItems response parameter

Use Scan correctly

Since a Scan operation goes over your entire desk (or secondary index), it is extremely seemingly that it consumes a big chunk of the provisioned throughput, particularly if it is a big desk. That being mentioned, Scan must be your final resort. Test whether or not Question API (or BatchGetItem) works on your use-case.

The identical applies to parallel Scan.

There are a number of methods in which you’ll additional slender down the outcomes through the use of a Filter Expression, a Restrict parameter (as demonstrated earlier) or a ProjectionExpression to return solely a subset of attributes.

That’s all for this weblog. I hope you discovered it helpful.

Till subsequent time, Completely satisfied coding!

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments