发布于 2015-11-16 07:57:52 | 566 次阅读 | 评论: 1 | 来源: PHPERZ
Python编程语言
Python 是一种面向对象、解释型计算机程序设计语言,由Guido van Rossum于1989年底发明,第一个公开发行版发行于1991年。Python语法简洁而清晰,具有丰富和强大的类库。它常被昵称为胶水语言,它能够把用其他语言制作的各种模块(尤其是C/C++)很轻松地联结在一起。
ubuntu上跑python连接pg,报错 ImportError: No module named psycopg2
root@pgproxy1:~# python /home/zxw/PGWriterTest_m.py
Traceback (most recent call last):
File "/home/zxw/PGWriterTest_m.py", line 4, in <module>
import psycopg2
ImportError: No module named psycopg2
1
root@pgproxy1:~# apt-cache search psycopg2
python-psycopg2 - Python module for PostgreSQL
python-psycopg2-dbg - Python module for PostgreSQL (debug extension)
python-psycopg2-doc - Python module for PostgreSQL (documentation package)
python3-psycopg2 - Python 3 module for PostgreSQL
python3-psycopg2-dbg - Python 3 module for PostgreSQL (debug extension)
2
root@pgproxy1:~# python -V
Python 2.7.3
3
root@pgproxy1:~# apt-get install -y python-psycopg2 python-psycopg2-doc python-psycopg2-dbg
Reading package lists... Done
Building dependency tree
Reading state information... Done
...
Processing triggers for libc-bin ...
ldconfig deferred processing now taking place
root@pgproxy1:~#
4
脚本如下:
# --encoding:utf-8--
import time
import threading
import psycopg2
import Queue
import datetime
该接口相关可以参考其官网:
http://initd.org/psycopg/
class PGWriterTest(threading.Thread):
""" 初始化 """
def __init__(self,connstr):
self.conn = psycopg2.connect(connstr)
self.cursor = self.conn.cursor()
self.dbnum = 4
self.connArray = []
self.cursorArray = []
for i in range(0,self.dbnum):
#DB port
dbidstr = '%02d' % (9900 + i)
if (i == 0 or i == 2 ):
dstDB = 'host=ip1 user=dbusername password=pwd dbname=dbname port='+ dbidstr
else:
dstDB = 'host=ip2 user=dbusername password=pwd dbname=dbname port='+ dbidstr
print 'connect ' + dstDB
dstConn = psycopg2.connect(dstDB)
self.connArray.append(dstConn)
dstCursor = dstConn.cursor()
self.cursorArray.append(dstCursor)
# 运行父类的构造函数
threading.Thread.__init__(self)
""" 数据写入数据库 """
def read(self,t_id):
sql = 'SELECT * from get_cont('+str(t_id)+')'
self.cursor.execute(sql)
datalist = []
for row in self.cursor.fetchall():
datalist.append(row)
return datalist
""" 数据写入数据库 """
def save(self,t_id,data):
sql = 'SELECT write_cont((%s,character(255) %s,%d,NOW()::timestamp))'
params = []
params.append(t_id)
params.append(data[1])
params.append(data[2])
params.append(data[3])
#print params
#取db_ins_id
db_ins_id = 0
db_ins_id = t_id % self.dbnum
try:
print str(datetime.datetime.now()) + " DB " + str(db_ins_id) + " " + str( t_id ) + " before insert"
insert_sql = 'SELECT write_cont((%s,character(255) %s,%d,NOW()::timestamp))'
self.cursorArray[db_ins_id].execute(insert_sql,params)
self.cursorArray[db_ins_id].execute("COMMIT")
print str(datetime.datetime.now()) + " DB " + str(db_ins_id) + " " + str( t_id ) + " inserted"
return True
except Exception,ex:
print("save error:%s" % str(ex))
print("save t_id:%s\t" % str(t_id))
print("error cont:%s" % str(params))
#self.log.write("error param:%s" % str(params))
#self.log.write("error t_id:%s" % data.get('t_id',''))
self.cursorArray[db_ins_id].execute("ROLLBACK")
return False
#测试读取
"""
try:
searchsql = 'select t_id from get_cont('+ str(t_id) + ')'
self.cursorArray[db_ins_id].execute(searchsql)
data = self.cursorArray[db_ins_id].fetchone()
if (data == None or int(data[0]) != t_id ):
print ' insert error for ' +str(db_ins_id) + searchsql
except Exception,searchex:
print str(searchex) + ' for ' + searchsql
"""
return True
if __name__ == "__main__":
pgTest = PGWriterTest('host=ip user=DBUser password=pwd dbname=DBNAme port=9999')
start_time = str(datetime.datetime.now())
for i in range(1,10):
t_id = 1000 + i;
print str(datetime.datetime.now()) + " getting "
rows = pgTest.read(t_id)
print str(datetime.datetime.now()) + " gotten "
#print rows[0]
pgTest.save(t_id,rows[0])
print start_time
print str(datetime.datetime.now())