Study with sensible code examples
The DynamoDB Scan API accesses each gadgets in a desk (or secondary index). It’s the equal of a choose * from
question. One of many issues I’ll cowl on this weblog is the right way to use Scan API with the DynamoDB Go SDK.
To scan a desk, we’d like some knowledge to start with! So within the course of, I can even go into the right way to use the Batch
API to jot down bulk knowledge in DynamoDB
. You should use the BatchWriteItem API to create or delete gadgets in batches (of twenty 5) and it is doable to you’ll be able to mix these operations throughout a number of tables.
We are going to begin easy and steadily enhance our method to make use of the APIs effectively. I can even go over a number of the primary checks that I ran to exhibit incremental enhancements. Lastly I’ll wrap up by highlighting a number of the issues whereas utilizing these operations.
You’ll be able to seek advice from the code on GitHub
… ensure that to create a DynamoDB
desk referred to as customers
with:
- partition key
e mail
(knowledge sortString
) and On-Demand
capability mode.
Additionally, there are some things I wish to name a number of issues to set the context:
- The desk was created in
us-east-1
and checks had been executed from anEC2
occasion inus-east-1
as properly - Since these are common checks as an alternative of specialized benchmarks, I didn’t do any particular tuning (at any stage). These are simply Go capabilities that had been executed with completely different inputs, retaining issues so simple as doable.
- The checks embody marshalling (changing Go
struct
toDynamoDB
knowledge sorts) forBatchWriteItem
operations and un-marshalling (changing fromDynamoDB
knowledge sorts again to Gostruct
) forScan
operation.
Lets begin off by exploring the BatchWriteItem
API. This fashion we could have knowledge to work with the Scan
operations as properly.
Win-win!
Since you’ll be able to mix 25 gadgets in a single invocation, utilizing a batch method for bulk knowledge imports is significantly better in comparison with invoking the PutItem in a loop (and even in parallel).
Here’s a primary instance of how you’ll use BatchWriteItem
:
func basicBatchImport() {startTime := time.Now()
cities := []string{"NJ", "NY", "ohio"}
batch := make(map[string][]sorts.WriteRequest)
var requests []sorts.WriteRequest
for i := 1; i <= 25; i++ {
person := Consumer{Electronic mail: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, Metropolis: cities[rand.Intn(len(cities))]}
merchandise, _ := attributevalue.MarshalMap(person)
requests = append(requests, sorts.WriteRequest{PutRequest: &sorts.PutRequest{Merchandise: merchandise}})
}
batch[table] = requests
op, err := consumer.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})
if err != nil {
log.Deadly("batch write error", err)
} else {
log.Println("batch insert carried out")
}
if len(op.UnprocessedItems) != 0 {
log.Println("there have been", len(op.UnprocessedItems), "unprocessed information")
}
log.Println("inserted", (25 - len(op.UnprocessedItems)), "information in", time.Since(startTime).Seconds(), "seconds")
}
With BatchWriteItemInput, we are able to outline the operations we wish to carry out within the batch — right here we’re simply going to carry out PutRequests (which is encapsulated inside one other sort referred to as WriteRequest).
We assemble the WriteRequest
s in a slice and eventually put them in a map
with key being the desk identify – that is precisely what the RequestItems
attribute in BatchWriteItemInput
wants.
On this case we’re coping with a single desk however you may execute operations on a number of tables.
On this instance we simply handled one batch of 25 information (most permitted batch dimension). If we wish to import extra information, all we have to do is cut up them into batches of 25 and execute them one (sub)batch at a time. Easy sufficient — right here is an instance:
func basicBatchImport2(whole int) {startTime := time.Now()
cities := []string{"NJ", "NY", "ohio"}
batchSize := 25
processed := whole
for num := 1; num <= whole; num = num + batchSize {
batch := make(map[string][]sorts.WriteRequest)
var requests []sorts.WriteRequest
begin := num
finish := num + 24
for i := begin; i <= finish; i++ {
person := Consumer{Electronic mail: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, Metropolis: cities[rand.Intn(len(cities))]}
merchandise, _ := attributevalue.MarshalMap(person)
requests = append(requests, sorts.WriteRequest{PutRequest: &sorts.PutRequest{Merchandise: merchandise}})
}
batch[table] = requests
op, err := consumer.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})
if err != nil {
log.Deadly("batch write error", err)
}
if len(op.UnprocessedItems) != 0 {
processed = processed - len(op.UnprocessedItems)
}
}
log.Println("all batches completed. inserted", processed, "information in", time.Since(startTime).Seconds(), "seconds")
if processed != whole {
log.Println("there have been", (whole - processed), "unprocessed information")
}
}
I attempted this with 50000 information (which implies 2000 batches) and it took roughly 15 seconds. However we are able to do significantly better!
Parallel batch import
As an alternative of processing every batch sequentially, we are able to spin up a goroutine
for every batch:
func parallelBatchImport(numRecords int) {startTime := time.Now()
cities := []string{"NJ", "NY", "ohio"}
batchSize := 25
var wg sync.WaitGroup
processed := numRecords
for num := 1; num <= numRecords; num = num + batchSize {
begin := num
finish := num + 24
wg.Add(1)
go func(s, e int) {
defer wg.Executed()
batch := make(map[string][]sorts.WriteRequest)
var requests []sorts.WriteRequest
for i := s; i <= e; i++ {
person := Consumer{Electronic mail: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, Metropolis: cities[rand.Intn(len(cities))]}
merchandise, err := attributevalue.MarshalMap(person)
if err != nil {
log.Deadly("marshal map failed", err)
}
requests = append(requests, sorts.WriteRequest{PutRequest: &sorts.PutRequest{Merchandise: merchandise}})
}
batch[table] = requests
op, err := consumer.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})
if err != nil {
log.Deadly("batch write error", err)
}
if len(op.UnprocessedItems) != 0 {
processed = processed - len(op.UnprocessedItems)
}
}(begin, finish)
}
log.Println("ready for all batches to complete....")
wg.Wait()
log.Println("all batches completed. inserted", processed, "information in", time.Since(startTime).Seconds(), "seconds")
if processed != numRecords {
log.Println("there have been", (numRecords - processed), "unprocessed information")
}
}
The outcomes improved by a very good margin. Here’s what I bought. On a median:
- Inserting 50000 information took ~ 2.5 seconds
- inserted 100000 information in ~ 4.5 to 5 seconds
- inserted 150000 information in lower than 9.5 seconds
- inserted 200000 information in lower than 11.5 seconds
There possibly unprocessed information in a batch. This instance detects these information, however the retry logic has been skipped to maintain issues easy. Ideally it’s best to have a (exponential back-off primarily based) retry mechanism for dealing with unprocessed information as properly.
To insert extra knowledge, I ran the parallelBatchImport
operate (above) in loops. For instance:
for i := 1; i <= 100; i++ {
parallelBatchImport(50000)
}
Alright, let’s transfer forward. Now that we have now some knowledge, let’s attempt …
That is what primary utilization appears like:
func scan() {
startTime := time.Now()op, err := consumer.Scan(context.Background(), &dynamodb.ScanInput{
TableName: aws.String(desk),
ReturnConsumedCapacity: sorts.ReturnConsumedCapacityTotal,
})
if err != nil {
log.Deadly("scan failed", err)
}
for _, i := vary op.Gadgets {
var u Consumer
err := attributevalue.UnmarshalMap(i, &u)
if err != nil {
log.Deadly("unmarshal failed", err)
}
}
if op.LastEvaluatedKey != nil {
log.Println("all gadgets haven't been scanned")
}
log.Println("scanned", op.ScannedCount, "gadgets in", time.Since(startTime).Seconds(), "seconds")
log.Println("consumed capability", *op.ConsumedCapacity.CapacityUnits)
}
Simply present the desk (or secondary index) identify and you’re good to go! However, there are probabilities that you simply may not be capable of get all gadgets due to API limits (1 MB price of information per invocation). In my case took about 0.5 secs for about 15000 information — remainder of the gadgets had been skipped as a result of the 1 MB restrict was breached.
Utilizing Pagination
To deal with the limitation round knowledge, the Scan
API returns LastEvaluatedKey
in its output to level to the final processed file. All you have to do is invoke Scan
once more, with the worth for ExclusiveStartKey
attribute set to the one for LastEvaluatedKey
.
Utilizing paginated scan method took me roughly 100 secs to scan ~ 7.5 million information.
Parallel Scan
Pagination helps, nevertheless it’s nonetheless a sequential course of. There’s lot of scope for enchancment. Fortunately, Scan
lets you undertake a parallelized method i.e. you need to use a number of staff (goroutine
s on this case) to course of knowledge in parallel!
func parallelScan(pageSize, totalWorkers int) {
log.Println("parallel scan with web page dimension", pageSize, "and", totalWorkers, "goroutines")
startTime := time.Now()var whole int
var wg sync.WaitGroup
wg.Add(totalWorkers)
for i := 0; i < totalWorkers; i++ {
// begin a goroutine for every section
go func(segId int) {
var segTotal int
defer wg.Executed()
lastEvaluatedKey := make(map[string]sorts.AttributeValue)
scip := &dynamodb.ScanInput{
TableName: aws.String(desk),
Restrict: aws.Int32(int32(pageSize)),
Section: aws.Int32(int32(segId)),
TotalSegments: aws.Int32(int32(totalWorkers)),
}
for {
if len(lastEvaluatedKey) != 0 {
scip.ExclusiveStartKey = lastEvaluatedKey
}
op, err := consumer.Scan(context.Background(), scip)
if err != nil {
log.Deadly("scan failed", err)
}
segTotal = segTotal + int(op.Depend)
for _, i := vary op.Gadgets {
var u Consumer
err := attributevalue.UnmarshalMap(i, &u)
if err != nil {
log.Deadly("unmarshal failed", err)
}
}
if len(op.LastEvaluatedKey) == 0 {
log.Println("[ segment", segId, "] completed")
whole = whole + segTotal
log.Println("whole information processsed by section", segId, "=", segTotal)
return
}
lastEvaluatedKey = op.LastEvaluatedKey
}
}(i)
}
log.Println("ready...")
wg.Wait()
log.Println("carried out...")
log.Println("scanned", whole, "gadgets in", time.Since(startTime).Seconds(), "seconds")
}
Section
and TotalSegments
attributes are the important thing to how Scan
API permits parallelism. TotalSegments
is nothing however the variety of threads/goroutine
s/worker-processes that should be spawned and Section
is a singular identifier for every of them.
In my checks, the Scan
efficiency remained (nearly) fixed at 37-40 seconds (common) for about ~ 7.5 million information (I attempted a wide range of web page dimension and goroutine
mixtures).
What number of TotalSegments
do I must configure???
To tune acceptable variety of parallel threads/staff, you may must experiment a bit. Lots may rely in your consumer setting.
- Do you will have sufficient compute sources?
- Some environments/runtimes might need managed thread-pools, so you’ll have to adjust to these
So, you have to to attempt issues out to seek out the optimum parallelism on your. a technique to consider it may very well be to decide on one section (single employee/thread/goroutine
) per unit of information (say a section for each GB of information you wish to scan).
Each Batch
and Scan
APIs are fairly highly effective, however there are nuances you have to be conscious of. My advise is to learn up the API documentation totally.
With Batch
APIs:
- Not more than 25 requests in a batch
- Particular person merchandise in a batch ought to not exceeds 400KB
- Whole dimension of things in a single
BatchWriteItem
can’t be greater than 16MB BatchWriteItem
can not replace gadgets- You can not specify circumstances on particular person
put
anddelete
requests - It does not return deleted gadgets within the response
- If there are failed operations, you’ll be able to entry them by way of the
UnprocessedItems
response parameter
Use Scan correctly
Since a Scan
operation goes over your entire desk (or secondary index), it is extremely seemingly that it consumes a big chunk of the provisioned throughput, particularly if it is a big desk. That being mentioned, Scan
must be your final resort. Test whether or not Question API (or BatchGetItem) works on your use-case.
The identical applies to parallel
Scan
.
There are a number of methods in which you’ll additional slender down the outcomes through the use of a Filter Expression, a Restrict
parameter (as demonstrated earlier) or a ProjectionExpression
to return solely a subset of attributes.
That’s all for this weblog. I hope you discovered it helpful.
Till subsequent time, Completely satisfied coding!