Chinaunix首页 | 论坛 | 博客
  • 博客访问: 574160
  • 博文数量: 226
  • 博客积分: 10080
  • 博客等级: 上将
  • 技术积分: 1725
  • 用 户 组: 普通用户
  • 注册时间: 2007-11-26 11:15
文章分类

全部博文(226)

文章存档

2011年(5)

2010年(64)

2009年(99)

2008年(37)

2007年(21)

我的朋友

分类: LINUX

2009-05-11 20:49:54

Learn how to improve the throughput and responsiveness of Oracle Database-backed Python applications with the help of threading and concurrency.

By Yuli Vasiliev

Published April 2009

With the trend toward more, rather than faster, cores, exploiting concurrency is increasing in importance. Concurrency creates a new paradigm shift in programming, making it possible to write asynchronous code that separates tasks into a set of threads or processes working in parallel. If you are not new to programming and are somewhat familiar with C or C++, you probably already have some idea of threads and processes and know how they differ from each other: threads provide a lightweight alternative to processes when it comes to concurrent programming, which makes multithreading preferable to multiprocessing in most cases. This article therefore discusses the multithreaded approach to concurrency.

As with many other programming languages, separating CPU-intensive tasks into multiple processes in Python (this can be done using the multiprocessing module of the Python standard librarymay give you some performance benefits when utilizing a multiple-CPU machine. It can really run multiple operations in parallel, as opposed to just switching between tasks while performing only one task at any given time, which is true for single-processor machines. In contrast, you may not notice any performance improvement when moving a multithreaded Python program to a multiple-CPU machine, because of the global interpreter lock (GIL), which is used by Python to protect internal data structures, ensuring that only one thread at a time runs the CPython virtual machine.

However, you may still be interested in adding threads to a database-backed Python program to speed it up. The point is that the underlying database your Python program interacts with is most likely installed on a high-performance server that processes submitted queries in parallel. This means that you can benefit from submitting several queries to the database server in separate threads in parallel, rather than issuing them sequentially, one after another, in a single thread.

One word of warning: although utilizing the natural parallelism of tasks may significantly improve application performance, it’s important to realize that not all tasks can be performed in parallel. For example, you cannot issue confirmation e-mails to customers until the operation they requested—say, a money transfer—is complete. It’s fairly obvious that such tasks must be performed sequentially in a certain order.

Another important thing to keep in mind when building multithreaded code is that some threads running in parallel may try to change shared objects at the same time, which may lead to loss of data, data malformation, or even corruption of the object being changed. To avoid this problem, it would be helpful to control access to shared objects so that only one thread can use such an object at once. Fortunately, Python lets you implement a locking mechanism to synchronize access to shared objects utilizing locking tools available in the thread module.

The downside to using locks is that it hurts scalability. When designing for scalability, it’s important to remember that putting a lock on a resource within one thread makes that resource unavailable in all the other running threads and processes until the lock is released. Thus, to ensure efficient resource management, you should not overuse locks, avoiding them whenever possible or releasing them as soon as possible when they are required.

Luckily, you don’t need to worry about locking when you’re dealing with resources stored in an Oracle database. This is because Oracle Database uses its own locking mechanisms behind the scenes when it comes to providing access to shared data in concurrent environments. So it’s often a good idea to keep shared data in the database, thus letting Oracle Database take care of the concurrency problems.

Another good way to achieve scalability and benefit from concurrency is to perform operations asynchronously. In asynchronous programming, the blocking code is queued for completion later by a separate thread, enabling your application to proceed with other tasks. Using an asynchronous framework such as Twisted may greatly simplify the task of building asynchronous applications.

This article provides a brief introduction to building concurrent applications with Python and Oracle Database, describing how to utilize threads in Python code interacting with an Oracle database and explaining how to submit SQL queries to the database server in parallel instead of processing them sequentially. You will also learn how to make Oracle Database take care of concurrency problems as well as how to take advantage of Twisted, a Python event-driven framework.

Multithreaded Programming in Python

Threads are a very useful feature when it comes to parallel processing. If you have a program that is performing time-consuming operations and can divide it into several independent tasks to be performed in parallel, using threads can help you build more-efficient, faster code. Another interesting use of multithreading can be to improve the responsiveness of your application—the main program remains responsive while time-consuming operations are performed in the background.

Enclosing long-running SQL statements in separate threads in Python can be a good idea when these statements do not depend on each other and can be executed in parallel. For example, you might significantly reduce the loading time of a Web page if it is submitting initial SQL queries to the database server in parallel instead of processing them sequentially, thus making them wait for one another.

Another situation in which you might find threads useful is if you have to upload some large objects (LOBs) into the database. Doing this in parallel can not only reduce the overall time it takes to upload your LOBs into the database but can also keep the main thread of the program responsive while parallel uploading is happening in the background.

Suppose you need to upload a couple of binary large objects (BLOBs) to the database and save them to the blob_tab table that you might have created in a custom database schema, as follows:

CREATE TABLE blob_tab(
   id NUMBER PRIMARY KEY,
   blobdoc BLOB
);

CREATE SEQUENCE blob_seq;

First, let’s look at how you might store BLOBs into the blob_tab table one after another without utilizing threads. The following is the Python script that serves this purpose, persisting two input images obtained by use of the filename and URL, respectively. The example assumes that you have created a blob_tab table and a blob_seq sequence in a usr/pswd custom database schema:

#File: singlethread.py
#Storing BLOBs in a single thread sequentially, one after another

import cx_Oracle
from urllib import urlopen

inputs = []
#if you’re a Windows user, the path could be 'c:/temp/figure1.bmp'
inputs.append(open('/tmp/figure1.bmp', 'rb'))
inputs.append(urlopen('', 'rb'))
#obtaining a connection and predefining a memory area for a BLOB
dbconn = cx_Oracle.connect('usr', 'pswd', '127.0.0.1/XE')
dbconn.autocommit = True
cur = dbconn.cursor()
cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
#executing INSERT statements saving BLOBs to the database
for input in inputs:
   blobdoc = input.read()
   cur.execute("INSERT INTO blob_tab (ID, BLOBDOC) VALUES(blob_seq.NEXTVAL, :blobdoc)", {'blobdoc':blobdoc})
   input.close()
dbconn.close()

Although the task of obtaining and storing figure1.bmp and the similar task for figure2.bmp are going on one after another here, these tasks, as you might guess, are not actually sequentially dependent. So you might refactor the above code so that it reads and stores each image within a separate thread, thus improving performance through parallel processing. It’s important to note that in this particular case, you won’t need to coordinate the threads running in parallel, which significantly simplifies coding.

The following example shows how you might rewrite the above script to use threads, utilizing an object-oriented approach. In particular, it illustrates how you can extend the Thread class from the threading module, customizing it for a particular task.

#File: multithread.py
#Storing BLOBs in separate threads in parallel

import cx_Oracle
import threading
from urllib import urlopen

#subclass of threading.Thread
class AsyncBlobInsert(threading.Thread):
  def __init__(self, cur, input):
    threading.Thread.__init__(self)
    self.cur = cur
    self.input = input
  def run(self):
    blobdoc = self.input.read()
    self.cur.execute("INSERT INTO blob_tab (ID, BLOBDOC) VALUES(blob_seq.NEXTVAL, :blobdoc)", {'blobdoc':blobdoc})
    self.input.close()
    self.cur.close()
#main thread starts here
inputs = []
inputs.append(open('/tmp/figure1.bmp', 'rb'))
inputs.append(urlopen('', 'rb'))
dbconn = cx_Oracle.connect('usr', 'pswd', '127.0.0.1/XE',threaded=True)
dbconn.autocommit = True
for input in inputs:
   cur = dbconn.cursor()
   cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
   th = AsyncBlobInsert(cur, input)
   th.start()

In the above code, note the use of the threaded attribute passed to the cx_Oracle.connect method as a parameter. By setting it to true, you instruct Oracle Database to use the OCI_THREADED mode, also known as the threaded mode, thus specifying that the application is running in a multithreaded environment. It’s interesting to note here that using the threaded mode with a single-threaded application would not be a good idea. According to the cx_Oracle documentation, turning the threaded parameter to true in a single-threaded application may impose a performance penalty of 10 percent to 15 percent.

In this example, you share a single connection between two threads, creating a separate cursor object for each thread, though. The operation of reading and then inserting a BLOB into the database is implemented here within the overridden run method of the AsyncBlobInsert custom subclass of the threading.Thread standard Python class. So all you need to do to start uploading a BLOB in a separate thread is to create an AsyncBlobInsert instance and then call its start method.

There is one problem with the script discussed here. When executed, it will not wait until the threads being launched are completed—the main thread will finish after launching the child threads, not waiting for their completion. If this is not the desired behavior and you want the program to complete only when all the threads have been completed, you might call the join method of each AsyncBlobInsert instance at the end of the script. This will block the main thread, making it wait for completion of the child threads. Here is how you might modify the preceding script so that it waits for all the threads launched in the for loop to finish:

...
th = []
for i, input in enumerate(inputs):
   cur = dbconn.cursor()
   cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
   th.append(AsyncBlobInsert(cur, input))
   th[i].start()
#main thread waits until all child threads are done
for t in th:
   t.join()

The next section provides an example in which forcing the main thread to wait for completion of the child threads is required.

Synchronizing Access to Shared Resources

The preceding example showed a multithreaded Python application dealing with a couple of tasks that did not depend on each other and thus could be easily decoupled and put into separate threads to be processed in parallel. In practice, though, you often have to deal with operations that depend on each other and require synchronization at some point.

Being part of a single process, threads share the same global memory and therefore can pass information to each other through shared resources such as variables, class instances, streams, and files. However, this simple way of exchanging information between threads comes at a price—you really need to be careful when it comes to modifying an object that can be accessed and/or modified in another thread at the same time. So it would be useful to be able to avoid collisions, employing a mechanism for synchronizing access to shared data.

To help with this problem, Python lets you allocate a lock that can be then acquired by a thread to ensure exclusive access to the data structures you work with in that thread. The threading module comes with the Lock method, which you can use to allocate a lock. Be warned, though, that a lock allocated with the threading.Lock method is initially in an unlocked state. To lock an allocated lock, you have to explicitly call the acquire method of that lock object. After that, you can perform operations on the objects that need locking. For example, you might have to use a lock when writing to the stdout standard output stream in a thread, to avoid potential overlap with the other threads working with stdout. Once you’ve done that, you have to release the lock with the release method of the lock object, thus making the released data structures available for further processing in other threads.

An interesting thing about locks is that they are not bound to a single thread. A lock allocated within one thread can then be acquired by another and released by a third thread. The following script represents a simple example of locks in action. Here you allocate a lock in the main thread to subsequently utilize it in the child threads, acquiring it before writing to a DOM document and then immediately releasing it.

#File: synchmultithread.py
#Using locks for synchronization in a multithreaded script

import sys
import cx_Oracle
import threading
from xml.dom.minidom import parseString
from urllib import urlopen

#subclass of threading.Thread
class SynchThread(threading.Thread):
   def __init__(self, cur, query, dom):
     threading.Thread.__init__(self)
     self.cur = cur
     self.query = query[1]
     self.tag = query[0]
     self.dom = dom
   def run(self):
     self.cur.execute(self.query)
     rslt = self.cur.fetchone()[0]
     self.cur.close()
     mutex.acquire()
     sal = self.dom.getElementsByTagName('salary')[0]
     newtag = self.dom.createElement(self.tag)
     newtext = self.dom.createTextNode('%s'%rslt)
     newtag.appendChild(newtext)
     sal.appendChild(newtag)
     mutex.release()
#main thread starts here
domdoc = parseString('')
dbconn = cx_Oracle.connect('hr', 'hr', '127.0.0.1/XE',threaded=True)
mutex = threading.Lock()
queries = {}
queries['avg'] = "SELECT AVG(salary) FROM employees"
queries['max'] = "SELECT MAX(salary) FROM employees"
th = []
for i, query in enumerate(queries.items()):
   cur = dbconn.cursor()
   th.append(SynchThread(cur, query, domdoc))
   th[i].start()
#forcing the main thread to wait until all child threads are done
for t in th:
   t.join()
#printing out the result xml document
domdoc.writexml(sys.stdout)

In the above script, you first create a Document Object Model (DOM) document object in the main thread and then modify this document in the child threads running in parallel, adding tags containing information obtained from the database. Here you use two simple queries against the employees table in the HR demonstration schema. To avoid potential collisions during parallel writing to the DOM object, you, in each child thread, acquire the lock allocated in the main thread. Once the lock is acquired by one child thread, the other one will not be able to modify the DOM object processed here until the first thread releases the lock.

Then you synchronize the updates made to the DOM object in the child threads with the main thread, calling the join method of each of the child thread objects in the main thread. After that you further process the DOM document object in the main stream. In this particular example, you simply write it to the stdout standard output stream.

As you might notice, the example shown here doesn’t actually discuss how to lock database access operations, such as issuing queries or updates against the same database table in parallel threads. The fact is that Oracle Database has its own powerful locking mechanisms to ensure data integrity in concurrent environments. Your job is to use those mechanisms correctly. The next section discusses how you can utilize Oracle Database features to control concurrent access to shared data, thus making the database take care of the concurrency problems.

Making Oracle Database Take Care of Concurrency

As mentioned, you don’t need to manually implement resource locking in your Python code when it comes to accessing or manipulating shared data stored in an Oracle database. To address concurrency issues, Oracle Database uses different types of locks and multiversion concurrency control system behind the scenes, features based on the concept of a transaction. What this means in practice is that the only thing you have to worry about is to correctly utilize transactions to guarantee that the database data is accessed, updated, or changed properly. In particular, you must be careful when choosing between the autocommit and manual-commit transaction mode and when grouping several SQL statements into a transaction. Finally, you must avoid destructive interactions between concurrent transactions.

One important thing to remember here is that transactions you use in your Python code are associated with connections rather than cursors, meaning that you can easily logically group together statements executed with different cursors but through the same connection into a single transaction. However, if you want to implement two concurrent transactions, you will need to create two separate connection objects.

In the multithreaded sample discussed in the “Multithreaded Programming in Python” section earlier, you set the autocommit mode of the connection object to true, thus instructing the cx_Oracle module to implicitly issue a COMMIT after each INSERT statement. In that particular case, using the autocommit mode was reasonable, because this let you avoid synchronization between the child threads and the main thread, which would otherwise be required so that you could manually issue a COMMIT in the main thread, as shown below:

...
#main thread waits until all child threads are done
for t in th:
   t.join()
#and then issues a commit
dbconn.commit()

There are cases, though, where you have to use the above scenario. Consider the following example. Suppose you perform the following two operations, each in a parallel thread. In one thread, you save a purchase order document into the database, including the order details. In the other thread, you modify the table containing information about the products included in the order, updating the quantity of products left available for purchase.

It’s fairly obvious that the above two operations must be wrapped in a single transaction. To achieve this, you must have the autocommit mode off, which is the default. Also you will have to synchronize the parallel threads with the main thread and then explicitly issue a COMMIT, as shown in the above code snippet.

Although the above scenario can be easily implemented, in practice you will most likely want to implement the second operation—updating the quantity of products left available for purchase—inside the database, placing the BEFORE INSERT trigger on the table storing order details so that it automatically updates the corresponding record in the table containing information about the products. This would simplify the code on the Python side and eliminate the need to write a multithreaded Python script, making Oracle Database worry about data integrity issues. The fact is that Oracle Database will automatically roll back the operation of inserting a new row into the details table if a problem occurs while the products table is being updated within the BEFORE INSERT trigger placed on the details table. On the Python side, all that’s left is to wrap in a transaction all the INSERTs used to save the details of an order, as follows:

...
dbconn = cx_Oracle.connect('hr', 'hr', '127.0.0.1/XE',threaded=True)
dbconn.autocommit = False
cur = dbconn.cursor()
...
for detail in details:
   id = detail['id']
   quantity = person['quantity']
   cur.execute("INSERT INTO details(id, quantity) VALUES(:id, :quantity)", {'id':id, 'quantity':quantity})
dbconn.commit()
...

Using Twisted, Python Event-Driven Framework

Twisted makes multithreaded programming in Python simpler and safer, providing a nice way of coding event-driven applications while hiding the complexity. The Twisted concurrency model is based on the concept of nonblocking calls. You call a function to request some data and specify a callback function to be called when the requested data is ready. In the meantime, the program can continue with other tasks.

The twisted.enterprise.adbapi module, an asynchronous wrapper for any DB-API-compatible Python module, enables you to perform database-related tasks in a nonblocking mode. With it, your application, for example, won’t wait until a connection to the database is established or a query is completed, instead performing other tasks in parallel. This section looks at a couple of simple examples of Twisted applications interacting with an Oracle database.

Twisted doesn’t come with Python—you need to download and install it after Python has been installed in your system. You can download the Twisted installation package suitable for your Python version and operating system from the Twisted Matrix Labs Web site at . Once you have downloaded the package, the installation process, a matter of a couple clicks in the Twisted setup wizard, takes about one minute.

Twisted is an event-driven framework, so it is to have an event loop that, once started, runs uninterruptedly, waiting for events to dispatch. In Twisted, the event loop is implemented with the object called reactor. You start the Twisted event loop with reactor.run method and stop it with reactor.stop. Another Twisted object, called Deferred, is used to manage callbacks. The following is a simplified example of the Twisted event loop and a callback in action. The __name__ test is used to guarantee that the solution will run only if the module is called as a main script but not imported (that is, it must be called from a command line, with the IDLE Python GUI, or by an icon click).

#File: twistedsimple.py
#A simple example of a Twisted app

from twisted.internet import reactor
from twisted.enterprise import adbapi

def printResult(rslt):
   print rslt[0][0]
   reactor.stop()

if __name__ == "__main__":
   dbpool = adbapi.ConnectionPool('cx_Oracle', user='hr', password ='hr', dsn='127.0.0.1/XE')
   empno = 100
   deferred = dbpool.runQuery("SELECT last_name FROM employees WHERE employee_id = :empno", {'empno':empno})
   deferred.addCallback(printResult)
   reactor.run()

It’s important to realize that the twisted.enterprise.adbapi module is built on top of the standard DB-API interface and, behind the scenes, utilizes the Python database module you specify when calling the adbapi.ConnectionPool method. Even a set of allowed keywords you can use when specifying the adbapi.ConnectionPool input parameters will depend on the database module type you’re using.

Despite some differences in syntax when it’s used with different Python database modules, twisted.enterprise.adbapi provides the ability to write asynchronous code that will continue with the flow of your program while safely processing database-related tasks in the background. The following example represents a simple Twisted Web application, querying the database asynchronously. This example assumes that you have created the blob_tab table and populated it with data, as discussed in the “Multithreaded Programming in Python” section at the beginning of the article.

#File: twistedTCPServer.py
#Querying database asynchronously with Twisted

from twisted.web import resource, server
from twisted.internet import reactor
from twisted.enterprise import adbapi

class BlobLoads(resource.Resource):
    def __init__(self, dbconn):
        self.dbconn = dbconn
        resource.Resource.__init__(self)
    def _getBlobs(self, txn, query):
        txn.execute(query)
        return txn.fetchall()
    def render_GET(self, request):
        query = "select id, blobdoc from blob_tab"
        self.dbconn.runInteraction(self._getBlobs, query).addCallback(
            self._writeBlobs, request).addErrback(
            self._exception, request)
        return server.NOT_DONE_YET
    def _writeBlobs(self, results, request):
        request.write("""
        
        BLOBs manipulating
        
          

Writing BLOBs from the database to your disk

""") for id, blobdoc in results: request.write("/tmp/picture%s.bmp
" % id) blob = blobdoc.read() output = open("/tmp/picture%s.bmp" % id, 'wb') output.write(blob) output.close() request.write("""

Operation completed

""") request.finish( ) def _exception(self, error, request): request.write("Error obtaining BLOBs: %s" % error.getErrorMessage()) request.write("""

Could not complete operation

""") request.finish( ) class SiteResource(resource.Resource): def __init__(self, dbconn): resource.Resource.__init__(self) self.putChild('', BlobLoads(dbconn)) if __name__ == "__main__": dbconn = adbapi.ConnectionPool('cx_Oracle', user='usr', password ='pswd', dsn='127.0.0.1/XE') site = server.Site(SiteResource(dbconn)) print "Listening on port 8000" reactor.listenTCP(8000, site) reactor.run()

When executed, this script starts a TCP server listening on port 8000. Upon accepting a client connection, the script will download all the images stored in the blob_tab database and store them in the /tmp folder as separate files, sending appropriate messages back to the client. To test the application, you will need to run the script and then point your browser to.

The most interesting thing about the above code is that it runs the query being issued against the database in a nonblocking mode, continuing with the flow of the program. To make sure it works this way, you might enhance the render_GET method with some code inserted below the call to runInteraction that instructs Twisted to make an asynchronous call to _getBlobs and then to _writeBlobs. The newly inserted code should send something back to the client, using the request.write method, so that you can see that this output appears before the output generated within _writeBlobs in the client’s browser.

Conclusion

Nowadays concurrency is frequently used in data-intensive applications. Using concurrency efficiently is key to improving application performance. One of the most efficient ways to write concurrent programs is to use multiple threads. As you have learned in this article, though, multithreading in Python does not provide benefits on multiprocessor machines, due to the global interpreter lock (GIL). However, you can still benefit from using multiple threads when it comes to developing database-intensive code as well as asynchronous, event-driven code.

This article is a good starting point in the tour of concurrency, providing you with valuable background information to help make decisions about how to best use concurrency when designing Oracle Database-backed Python applications.

阅读(736) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~