Resuming Google Bulk Data uploader for Appengine
Google Appengine does not provide a straight forward way to upload data to the data-store. They have one article which indicates a way to upload data to Google Appengine data store. Using this method, one can upload CSV file to your google Appengine APP by making repeated “HTTP” requests. While this work for smaller data-sets (1000) records or less, it fails for bigger data sets. I had a requirement to upload mover 2 million records for my salary search app and the program kept failing. Resuming a bulk upload, resets the counter and it start all the way from the beginning causing duplicate data.
I have modified the bulk upload program to provide do the following things
- Introduced a parameter called “skip” which will skip “N” number of records when the bulk data uploader resumes
- A lot of times, the bulk data uploader fails because of either time-out issues or network issues. Instead of aborting in such scenario, the program just goes to sleep for 1 min before resuming from where it left.
- This program writes the progress log to a text file. So in case of a failure, the log file contains the entry to where the program failed last time. You can skip that many records when resuming.
- Sample Call :
bulkload_client_resume.py –filename upload_me.csv –kind Visa –url .appspot.com/load">.appspot.com/load">http://<your app>.appspot.com/load –skip 1000
In order to resume the Bulk Data uploader, you have modify the following sources in your SDK
- bulkload_client.py under Google\google_appengine\tools
- Make a copy of the existing file and create a new file with the name bulkload_client_resume.py
- Change following line of code in bulkload_client_resume.py
BULKLOAD_CLIENT_PATH = ‘google/appengine/tools/bulkload_client_resume.py’
- bulkload_client.py under the following folder Google\google_appengine\google\appengine\tools
- Make a copy of existing file and create a new file with the name bulkload_client_resume.py
- Modify ImportCSV function
- Introduce a new parameter called skip, which will skip "X” number of records before resuming upload.
def ImportCSV(filename,
post_url,
cookie,
batch_size,
kind,
skip,
split_url=SplitURL,
openfile=file,
create_content_generator=ContentGenerator,
post_entities=PostEntities):
"""Imports CSV data using a series of HTTP posts.Args:
filename: File on disk containing CSV data.
post_url: URL to post the Entity data to.
cookie: Full cookie header to use while connecting.
batch_size: Maximum number of Entity objects to post with each request.
kind: Entity kind of the objects being posted.
split_url, openfile, create_content_generator, post_entities: Used for
dependency injection.Returns:
True if all entities were imported successfully; False otherwise.
"""
host_port, uri = split_url(post_url)
csv_file = openfile(filename, ‘r’)
retry=1
i=0
try:
content_gen = create_content_generator(csv_file, batch_size)
logging.info(‘Starting import; maximum %d entities per post’, batch_size)
for num_entities, content in content_gen:
retry=1#logic to skip
if i<int(skip):
logging.info(‘Skipping entities in %d skip %d’, i, int(skip))
i=i+int(batch_size)
else:
logging.info(‘Importing %d entities in %d bytes current count %d’,
num_entities, len(content),i)
LogMessage(‘Importing entities in bytes current count ‘+str(i) +’\n’)
LogMessage(time.ctime(time.time()) +’ :Importing entities in bytes current count ‘+str(i) +’\n’)
while retry==1:
try:
content = post_entities(host_port, uri, cookie, kind, content)
retry=0
i=i+int(batch_size)
except PostError, e:
logging.error(‘An error occurred while importing: %s’, e)
logging.error(‘Going to sleep’)
LogMessage(time.ctime(time.time()) + ‘ ERROR Count:’+str(i))#Sleeps for 1 min before retrying the same record
time.sleep(60)
logging.error(‘Retrying %s’)
retry=0
finally:
csv_file.close()
return True
- Introduce a new function, which will write a upload log to a log file.
def LogMessage(message):
try:
logfile = open(‘bulkupload.log’, ‘a’)
logfile.write(message)
except:
logging.error(sys.exc_info()[0]);
if logfile:
logfile.close()
Feel free to drop a note if you have trouble getting this to work
You can follow any responses to this entry through the RSS 2.0 feed. You can leave a response, or trackback from your own site.
December 19th, 2008 at 8:24 pm
Look this method in your code, I changed because there is a bug, he jumps to the next not making a new attempt!
def ImportCSV(filename,
post_url,
cookie,
batch_size,
kind,
skip,
split_url=SplitURL,
openfile=file,
create_content_generator=ContentGenerator,
post_entities=PostEntities):
“”"Imports CSV data using a series of HTTP posts.
Args:
filename: File on disk containing CSV data.
post_url: URL to post the Entity data to.
cookie: Full cookie header to use while connecting.
batch_size: Maximum number of Entity objects to post with each request.
kind: Entity kind of the objects being posted.
split_url, openfile, create_content_generator, post_entities: Used for
dependency injection.
Returns:
True if all entities were imported successfully; False otherwise.
“”"
host_port, uri = split_url(post_url)
csv_file = openfile(filename, ‘r’)
i=0
try:
content_gen = create_content_generator(csv_file, batch_size)
logging.info(‘Starting import; maximum %d entities per post’, batch_size)
for num_entities, content in content_gen:
#logic to skip
if i<int(skip):
logging.info(‘Skipping entities in %d skip %d’, i, int(skip))
i=i+int(batch_size)
else:
retry=1
logging.info(‘Importing %d entities in %d bytes current count %d’,
num_entities, len(content),i)
LogMessage(‘Importing entities in bytes current count ‘+str(i) +’\n’)
LogMessage(time.ctime(time.time()) +’ :Importing entities in bytes current count ‘+str(i) +’\n’)
while retry==1:
try:
content = post_entities(host_port, uri, cookie, kind, content)
retry=0
i=i+int(batch_size)
except PostError, e:
logging.error(‘An error occurred while importing: %s’, e)
logging.error(‘Going to sleep’)
LogMessage(time.ctime(time.time()) + ‘ ERROR Count:’+str(i))
time.sleep(2)
logging.error(‘Retrying…’)
# NO retry=0!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
retry=1
finally:
csv_file.close()
return True
January 18th, 2009 at 8:51 am
This “introduce a parameter” guidance could probably use a deeper explanation. For those wondering what thre complete resulting code looks like, take a look here:
http://gist.github.com/48681