1、测试数据库连接问题代码
def test_connectable_issue_example(self): # This tests the example raised in issue # https://github.com/pydata/pandas/issues/10104 def foo(connection): query = 'SELECT test_foo_data FROM test_foo_data' return sql.read_sql_query(query, con=connection) def bar(connection, data): data.to_sql(name='test_foo_data', con=connection, if_exists='append') def main(connectable): with connectable.connect() as conn: with conn.begin(): foo_data = conn.run_callable(foo) conn.run_callable(bar, foo_data) DataFrame({'test_foo_data': [0, 1, 2]}).to_sql( 'test_foo_data', self.conn) main(self.conn)
2、临时表相关操作示例代码
def test_temporary_table(self):
test_data = u'Hello, World!'
expected = DataFrame({'spam': [test_data]})
Base = declarative.declarative_base()
class Temporary(Base):
__tablename__ = 'temp_test'
__table_args__ = {'prefixes': ['TEMPORARY']}
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)
Session = sa_session.sessionmaker(bind=self.conn)
session = Session()
with session.transaction:
conn = session.connection()
Temporary.__table__.create(conn)
session.add(Temporary(spam=test_data))
session.flush()
df = sql.read_sql_query(
sql=sqlalchemy.select([Temporary.spam]),
con=conn,
)
tm.assert_frame_equal(df, expected)
3、查看表的索引相关代码
def test_connectable_issue_example(self): # This tests the example raised in issue # https://github.com/pydata/pandas/issues/10104 def foo(connection): query = 'SELECT test_foo_data FROM test_foo_data' return sql.read_sql_query(query, con=connection) def bar(connection, data): data.to_sql(name='test_foo_data', con=connection, if_exists='append') def main(connectable): with connectable.connect() as conn: with conn.begin(): foo_data = conn.run_callable(foo) conn.run_callable(bar, foo_data) DataFrame({'test_foo_data': [0, 1, 2]}).to_sql( 'test_foo_data', self.conn) main(self.conn)
4、获取数据相关操作
def get_data(query):
"""
Pulls data from the db based on the query
Input
-----
query: str
SQL query from the database
Output
------
data: DataFrame
Dump of Query into a DataFrame
"""
from setup_environment import db_dict
with setup_environment.connect_to_syracuse_db(**db_dict) as conn:
data = pd.read_sql_query(query, conn)
return data
5、显示hexbin数据
def show_hexbin(self, query):
"""shows hexbin plot over map
Args:
query: name of sql
"""
self.load()
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
hx = self.base_map.hexbin(
np.array([geom.x for geom in points]),
np.array([geom.y for geom in points]),
gridsize=275,
bins='log',
mincnt=1,
edgecolor='none',
alpha=1.,
lw=0.2,
cmap=plt.get_cmap('afmhot'))
plt.tight_layout()
plt.show()
6、显示scatter数据
def show_scatter(self, query, color='blue'):
self.load()
"""shows scatter plot over map
Args:
query: name of sql
"""
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
plot = self.base_map.scatter(
[point.x for point in points],
[point.y for point in points],
10, marker='o', lw=.25,
facecolor=color, edgecolor='w',
alpha=0.9, antialiased=True,
zorder=3)
plt.show()
7、数据集分析
def DataSetAnalytes(self, data_set, TableProcessingToReturn='Both_PeakFinding_TargetAnalyteFinding'): #查询样例表中的所有外键列,并返回给定数据集中发现的所有分析的列表。 column_string = self.createQueryColumnsStr(TableProcessingToReturn) #为给定数据集的所有外键列构建SQL语句和查询示例表 sql_statement = "SELECT %s FROM Sample WHERE Sample.DataSetName = '%s';" % (column_string, data_set) df = pd.read_sql_query(sql_statement, self.conn) return self.GetFoundAnalytesLst(df)
8、清除数据集数据
def ClearDataSetData(self, data_sets, analyte_table_lst): #为定义数据集清除数据库中的所有数据 #追加数据集浓度表,以便它也将包括在数据清理中 analyte_table_lst.append('DataSetConcentrations') #创建一个包含示例表中所有外键的字符串 #需要删除的数据(即逗号分隔的空格) ForeignKeyColumn_lst = [col + '_foreignkey' for col in analyte_table_lst] ForeignKeyColumn_Columns = ', '.join(ForeignKeyColumn_lst) #从数据集获取条件字符串 data_set_condition = self.CreateConditionClause_OrSeriesStr(data_sets, "DataSetName") # Resulting df: 列对应于包含要删除数据的表,每一列中的值是表中需要删除的记录的主键。 sql_statement = 'SELECT %s FROM Sample WHERE %s;' % (ForeignKeyColumn_Columns, data_set_condition) df = pd.read_sql_query(sql_statement, self.conn) df.columns = analyte_table_lst # 按列迭代dataframe以从数据库中删除每个记录 # note: column = table name for column in df: condition = self.CreateConditionClause_OrSeriesStr(set(df[column]), "id") self.conn.execute('DELETE FROM %s WHERE %s' % (column, condition)) #最后,还要从示例表中删除定义的记录 self.conn.execute('DELETE FROM Sample WHERE %s' % data_set_condition) self.CommitDB()