1、测试数据库连接问题代码
def test_connectable_issue_example(self):
# This tests the example raised in issue
# https://github.com/pydata/pandas/issues/10104
def foo(connection):
query = 'SELECT test_foo_data FROM test_foo_data'
return sql.read_sql_query(query, con=connection)
def bar(connection, data):
data.to_sql(name='test_foo_data',
con=connection, if_exists='append')
def main(connectable):
with connectable.connect() as conn:
with conn.begin():
foo_data = conn.run_callable(foo)
conn.run_callable(bar, foo_data)
DataFrame({'test_foo_data': [0, 1, 2]}).to_sql(
'test_foo_data', self.conn)
main(self.conn)
2、临时表相关操作示例代码
def test_temporary_table(self):
test_data = u'Hello, World!'
expected = DataFrame({'spam': [test_data]})
Base = declarative.declarative_base()
class Temporary(Base):
__tablename__ = 'temp_test'
__table_args__ = {'prefixes': ['TEMPORARY']}
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)
Session = sa_session.sessionmaker(bind=self.conn)
session = Session()
with session.transaction:
conn = session.connection()
Temporary.__table__.create(conn)
session.add(Temporary(spam=test_data))
session.flush()
df = sql.read_sql_query(
sql=sqlalchemy.select([Temporary.spam]),
con=conn,
)
tm.assert_frame_equal(df, expected)
3、查看表的索引相关代码
def test_connectable_issue_example(self):
# This tests the example raised in issue
# https://github.com/pydata/pandas/issues/10104
def foo(connection):
query = 'SELECT test_foo_data FROM test_foo_data'
return sql.read_sql_query(query, con=connection)
def bar(connection, data):
data.to_sql(name='test_foo_data',
con=connection, if_exists='append')
def main(connectable):
with connectable.connect() as conn:
with conn.begin():
foo_data = conn.run_callable(foo)
conn.run_callable(bar, foo_data)
DataFrame({'test_foo_data': [0, 1, 2]}).to_sql(
'test_foo_data', self.conn)
main(self.conn)
4、获取数据相关操作
def get_data(query):
"""
Pulls data from the db based on the query
Input
-----
query: str
SQL query from the database
Output
------
data: DataFrame
Dump of Query into a DataFrame
"""
from setup_environment import db_dict
with setup_environment.connect_to_syracuse_db(**db_dict) as conn:
data = pd.read_sql_query(query, conn)
return data
5、显示hexbin数据
def show_hexbin(self, query):
"""shows hexbin plot over map
Args:
query: name of sql
"""
self.load()
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
hx = self.base_map.hexbin(
np.array([geom.x for geom in points]),
np.array([geom.y for geom in points]),
gridsize=275,
bins='log',
mincnt=1,
edgecolor='none',
alpha=1.,
lw=0.2,
cmap=plt.get_cmap('afmhot'))
plt.tight_layout()
plt.show()
6、显示scatter数据
def show_scatter(self, query, color='blue'):
self.load()
"""shows scatter plot over map
Args:
query: name of sql
"""
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
plot = self.base_map.scatter(
[point.x for point in points],
[point.y for point in points],
10, marker='o', lw=.25,
facecolor=color, edgecolor='w',
alpha=0.9, antialiased=True,
zorder=3)
plt.show()
7、数据集分析
def DataSetAnalytes(self, data_set, TableProcessingToReturn='Both_PeakFinding_TargetAnalyteFinding'):
#查询样例表中的所有外键列,并返回给定数据集中发现的所有分析的列表。
column_string = self.createQueryColumnsStr(TableProcessingToReturn)
#为给定数据集的所有外键列构建SQL语句和查询示例表
sql_statement = "SELECT %s FROM Sample WHERE Sample.DataSetName = '%s';" % (column_string, data_set)
df = pd.read_sql_query(sql_statement, self.conn)
return self.GetFoundAnalytesLst(df)
8、清除数据集数据
def ClearDataSetData(self, data_sets, analyte_table_lst):
#为定义数据集清除数据库中的所有数据
#追加数据集浓度表,以便它也将包括在数据清理中
analyte_table_lst.append('DataSetConcentrations')
#创建一个包含示例表中所有外键的字符串
#需要删除的数据(即逗号分隔的空格)
ForeignKeyColumn_lst = [col + '_foreignkey' for col in analyte_table_lst]
ForeignKeyColumn_Columns = ', '.join(ForeignKeyColumn_lst)
#从数据集获取条件字符串
data_set_condition = self.CreateConditionClause_OrSeriesStr(data_sets, "DataSetName")
# Resulting df: 列对应于包含要删除数据的表,每一列中的值是表中需要删除的记录的主键。
sql_statement = 'SELECT %s FROM Sample WHERE %s;' % (ForeignKeyColumn_Columns, data_set_condition)
df = pd.read_sql_query(sql_statement, self.conn)
df.columns = analyte_table_lst
# 按列迭代dataframe以从数据库中删除每个记录
# note: column = table name
for column in df:
condition = self.CreateConditionClause_OrSeriesStr(set(df[column]), "id")
self.conn.execute('DELETE FROM %s WHERE %s' % (column, condition))
#最后,还要从示例表中删除定义的记录
self.conn.execute('DELETE FROM Sample WHERE %s' % data_set_condition)
self.CommitDB()