Python 的 pandas 库中,read_sql_query() 函数是一种非常有用的方法,可以直接从数据库执行 SQL 查询并将结果作为 DataFrame 对象返回。本文主要介绍使用pandas.read_sql_query()一些常用操作示例demo代码。

1、测试数据库连接问题代码

def test_connectable_issue_example(self):
        # This tests the example raised in issue
        # https://github.com/pydata/pandas/issues/10104
        def foo(connection):
            query = 'SELECT test_foo_data FROM test_foo_data'
            return sql.read_sql_query(query, con=connection)
        def bar(connection, data):
            data.to_sql(name='test_foo_data',
                        con=connection, if_exists='append')
        def main(connectable):
            with connectable.connect() as conn:
                with conn.begin():
                    foo_data = conn.run_callable(foo)
                    conn.run_callable(bar, foo_data)
        DataFrame({'test_foo_data': [0, 1, 2]}).to_sql(
            'test_foo_data', self.conn)
        main(self.conn) 

2、临时表相关操作示例代码

def test_temporary_table(self):
test_data = u'Hello, World!'
expected = DataFrame({'spam': [test_data]})
Base = declarative.declarative_base()
class Temporary(Base):
__tablename__ = 'temp_test'
__table_args__ = {'prefixes': ['TEMPORARY']}
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)
Session = sa_session.sessionmaker(bind=self.conn)
session = Session()
with session.transaction:
conn = session.connection()
Temporary.__table__.create(conn)
session.add(Temporary(spam=test_data))
session.flush()
df = sql.read_sql_query(
sql=sqlalchemy.select([Temporary.spam]),
con=conn,
)
tm.assert_frame_equal(df, expected)

3、查看表的索引相关代码

def test_connectable_issue_example(self):
        # This tests the example raised in issue
        # https://github.com/pydata/pandas/issues/10104
        def foo(connection):
            query = 'SELECT test_foo_data FROM test_foo_data'
            return sql.read_sql_query(query, con=connection)
        def bar(connection, data):
            data.to_sql(name='test_foo_data',
                        con=connection, if_exists='append')
        def main(connectable):
            with connectable.connect() as conn:
                with conn.begin():
                    foo_data = conn.run_callable(foo)
                    conn.run_callable(bar, foo_data)
        DataFrame({'test_foo_data': [0, 1, 2]}).to_sql(
            'test_foo_data', self.conn)
        main(self.conn) 

4、获取数据相关操作

def get_data(query):
"""
Pulls data from the db based on the query
Input
-----
query: str
SQL query from the database
Output
------
data: DataFrame
Dump of Query into a DataFrame
"""
from setup_environment import db_dict
with setup_environment.connect_to_syracuse_db(**db_dict) as conn:
data = pd.read_sql_query(query, conn)
return data

5、显示hexbin数据

def show_hexbin(self, query):
"""shows hexbin plot over map
Args:
query: name of sql
"""
self.load()
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
hx = self.base_map.hexbin(
np.array([geom.x for geom in points]),
np.array([geom.y for geom in points]),
gridsize=275,
bins='log',
mincnt=1,
edgecolor='none',
alpha=1.,
lw=0.2,
cmap=plt.get_cmap('afmhot'))
plt.tight_layout()
plt.show()

6、显示scatter数据

def show_scatter(self, query, color='blue'):
self.load()
"""shows scatter plot over map
Args:
query: name of sql
"""
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
plot = self.base_map.scatter(
[point.x for point in points],
[point.y for point in points],
10, marker='o', lw=.25,
facecolor=color, edgecolor='w',
alpha=0.9, antialiased=True,
zorder=3)
plt.show()

7、数据集分析

def DataSetAnalytes(self, data_set, TableProcessingToReturn='Both_PeakFinding_TargetAnalyteFinding'):
	#查询样例表中的所有外键列,并返回给定数据集中发现的所有分析的列表。
	column_string = self.createQueryColumnsStr(TableProcessingToReturn)
	#为给定数据集的所有外键列构建SQL语句和查询示例表
	sql_statement = "SELECT %s FROM Sample WHERE Sample.DataSetName = '%s';" % (column_string, data_set)		
	df = pd.read_sql_query(sql_statement, self.conn)
	return self.GetFoundAnalytesLst(df)

8、清除数据集数据

def ClearDataSetData(self, data_sets, analyte_table_lst):
        #为定义数据集清除数据库中的所有数据
	#追加数据集浓度表,以便它也将包括在数据清理中
	analyte_table_lst.append('DataSetConcentrations')
	#创建一个包含示例表中所有外键的字符串
        #需要删除的数据(即逗号分隔的空格)
	ForeignKeyColumn_lst = [col + '_foreignkey' for col in analyte_table_lst]
	ForeignKeyColumn_Columns = ', '.join(ForeignKeyColumn_lst)
	#从数据集获取条件字符串
	data_set_condition = self.CreateConditionClause_OrSeriesStr(data_sets, "DataSetName")
	# Resulting df: 列对应于包含要删除数据的表,每一列中的值是表中需要删除的记录的主键。
	sql_statement = 'SELECT %s FROM Sample WHERE %s;' % (ForeignKeyColumn_Columns, data_set_condition)
	df = pd.read_sql_query(sql_statement, self.conn)
	df.columns = analyte_table_lst
	# 按列迭代dataframe以从数据库中删除每个记录
	# note: column = table name
	for column in df:
		condition = self.CreateConditionClause_OrSeriesStr(set(df[column]), "id")
		self.conn.execute('DELETE FROM %s WHERE %s' % (column, condition))
        #最后,还要从示例表中删除定义的记录
	self.conn.execute('DELETE FROM Sample WHERE %s' % data_set_condition)
	self.CommitDB() 

推荐文档

相关文档

大家感兴趣的内容

随机列表