免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
Python+MongoDB導入股票日線及分鐘線數(shù)據(jù)

本文演示了采用Python腳本,配合MongoDB數(shù)據(jù)庫,讀取股票數(shù)據(jù)文件,并寫入數(shù)據(jù)庫的過程。 導入數(shù)據(jù)庫的方法支持單線程及多線程兩種方式。經(jīng)過本地開發(fā)機測試,開啟2-3個線程時,導入速度快于單線程導入;當開啟線程超過4個之后,導入速度反而小于單線程;猜測可能是跟機器CPU核心數(shù)有關,建議開始CPU核心數(shù)N-1的線程。例如,若CPU核心數(shù)為8時,可以開啟7個線程進行導入,可大幅提高導入速度。 Python操作MongoDB,需要引入pymongo模塊,可自行搜索該模塊的安裝方法,不再贅述。以下是Python代碼:

util_file.py 讀取數(shù)據(jù)文本,導入數(shù)據(jù)庫的公共方法類

#!/usr/bin/env python#coding:utf-8import threadingimport pymongoimport timeclass UtilFile:    def IsSubString(SubStrList,Str):        flag=True          for substr in SubStrList:              if not(substr in Str):                  flag=False          return flag    def GetFileList(FindPath,FlagStr=[]):        import os          FileList=[]          FileNames=os.listdir(FindPath)          if (len(FileNames)>0):             for fn in FileNames:                 if (len(FlagStr)>0):                     #返回指定類型的文件名                     if (UtilFile.IsSubString(FlagStr,fn)):                         #fullfilename=os.path.join(FindPath,fn)                         #FileList.append(fullfilename)                         FileList.append(fn)               else:                     #默認直接返回所有文件名                     #fullfilename=os.path.join(FindPath,fn)                     #FileList.append(fullfilename)                     FileList.append(fn)        #對文件名排序          if (len(FileList)>0):              FileList.sort()               return FileList    '''    函數(shù)功能:讀取通達信數(shù)據(jù)導出文件內(nèi)容,導入到數(shù)據(jù)庫中     入?yún)ⅲ?   conn_db: 導入所使用的數(shù)據(jù)庫    file_dir: 數(shù)據(jù)文件目錄    file_list: 數(shù)據(jù)目錄下文件名List    file_type: 數(shù)據(jù)文件類型。枚舉值。 M1:1分鐘線數(shù)據(jù); D: 日線數(shù)據(jù)    tb_pre_name: 數(shù)據(jù)表名稱前綴    print_level: 打印級別。枚舉值。0:不打印; 1:按文件打印; 2:按記錄打印    thread_name: 線程名稱(打印輸出用)    '''    def ImpDataFromFile(conn_db, file_dir, file_list, file_type, tb_pre_name, print_level, thread_name):        # file_list = UtilFile.GetFileList(file_dir)    #獲取1分鐘線目錄下文件名清單        file_num = len(file_list)       #文件總數(shù)量        file_prct = round(1/file_num,4)     #每文件占總數(shù)比        # 讀取文件清單中每個文件的內(nèi)容,并解析每行記錄 {        file_cnt = 0        for file in file_list:            file_cnt += 1   #當前處理文件序號            tm_start = time.clock()            #計算文件中的總行數(shù) {            op = open(file_dir+'/'+file)            record_num = 0            for line in op:                record_num += 1            #計算文件中的總行數(shù) }            #將文件逐行讀入表中 {            op = open(file_dir+'/'+file)            record_cnt = 0            for line in op:                if not("數(shù)據(jù)來源" in line):                    content_temp = line.split('\n')                    content = content_temp[0].split('\t')                    if file_type == "M1":                        record = {                                    "date":content[0], #日期                                    "time":content[1], #時間                                    "begin":float(content[2]), #開盤                                    "high":float(content[3]), #最高                                    "low":float(content[4]), #最低                                        "end":float(content[5]), #收盤                                    "vol":int(content[6]), #成交量                                    "amt":float(content[7])#成交額                                 }                    elif file_type == "D":                        record = {                                    "date":content[0], #日期                                    "begin":float(content[1]), #開盤                                    "high":float(content[2]), #最高                                    "low":float(content[3]), #最低                                        "end":float(content[4]), #收盤                                    "vol":int(content[5]), #成交量                                    "amt":float(content[6])#成交額                                 }                    clct = conn_db[tb_pre_name+file]    #打開/創(chuàng)建 數(shù)據(jù)表                    #計算處理百分比 {                    record_cnt += 1                    if print_level == 2:                        file_prcs = file_prct*(file_cnt-1)                        record_prct = round(record_cnt/record_num,4)                        ttl_prcs = round((file_prcs+file_prct*record_prct)*100,2)                        print(thread_name+"\t"+str(ttl_prcs)+"%"+"\t"+str(file_cnt)+"/"+str(file_num)+"\t"+file+"\t"+str(record_cnt)+"/"+str(record_num))                    #計算處理百分比 }                    #確保索引存在 {                    if file_type == "M1":                        clct.ensure_index([("date", pymongo.DESCENDING), ("time", pymongo.DESCENDING)])                        if clct.find({"date":content[0],"time":content[1]}).count() > 0:                            pass        #防止重復導入                        else:                            clct.insert(record)                    elif file_type == "D":                        clct.ensure_index([("date", pymongo.DESCENDING)])                        if clct.find({"date":content[0]}).count() > 0:                            pass        #防止重復導入                        else:                            clct.insert(record)                    #確保索引存在 }            #將文件逐行讀入表中 }            tm_finish = time.clock()            tm_spend = round(tm_finish - tm_start, 2)            if print_level == 1:                file_prcs = round(file_prct*file_cnt*100,2)                print(thread_name+"\t"+str(file_prcs)+"%"+"\t"+str(file_cnt)+"/"+str(file_num)+"\t"+file+"\t"+str(tm_spend)+"s")        # 讀取文件清單中每個文件的內(nèi)容,并解析每行記錄 }        print(thread_name+"\t"+"Import Done!")    '''    函數(shù)功能:使用多線程方式導入數(shù)據(jù)入口方法     入?yún)ⅲ?   imp_thread_num: 開啟線程數(shù)    conn_db: 導入所使用的數(shù)據(jù)庫    file_dir: 數(shù)據(jù)文件目錄    file_type: 數(shù)據(jù)文件類型。枚舉值。 M1:1分鐘線數(shù)據(jù); D: 日線數(shù)據(jù)    tb_pre_name: 數(shù)據(jù)表名稱前綴    print_level: 打印級別。枚舉值。0:不打印; 1:按文件打印; 2:按記錄打印    '''    def ImpDataFromFileMultThrd(imp_thread_num, conn_db, file_dir, file_type, tb_pre_name, print_level):        file_list = UtilFile.GetFileList(file_dir, ".txt")        thrd_num = imp_thread_num        thrd_task = []        for i in range(thrd_num):            thrd_task.append([])        for i in range(len(file_list)):            idx = divmod(i,thrd_num)[1]            thrd_task[idx].append(file_list[i])        print(thrd_task)        for i in range(thrd_num):            thrd = MyThread(i,"Thread-"+str(i+1),[conn_db, file_dir, thrd_task[i-1], file_type, tb_pre_name, print_level])            thrd.start()class MyThread (threading.Thread):   #繼承父類threading.Thread    def __init__(self, threadID, name, func_args):        threading.Thread.__init__(self)        self.threadID = threadID        self.name = name        self.conn_db = func_args[0]        self.file_dir = func_args[1]        self.file_list = func_args[2]        self.file_type = func_args[3]        self.tb_pre_name = func_args[4]        self.print_level = func_args[5]    def run(self):                   #把要執(zhí)行的代碼寫到run函數(shù)里面 線程在創(chuàng)建后會直接運行run函數(shù)         print("Starting " + self.name)        UtilFile.ImpDataFromFile(self.conn_db, self.file_dir, self.file_list, self.file_type, self.tb_pre_name, self.print_level, self.name)        print("Exiting " + self.name)

util_otr.py 一些其它常用的公共函數(shù)方法

#!/usr/bin/env python#coding:utf-8'''函數(shù)功能:根據(jù)傳入的字符串,刪除名稱匹配的聚集 入?yún)ⅲ篸b_conn: 數(shù)據(jù)庫連接clct_substr: 需匹配名稱的字符串'''def DelClcts(db_conn, clct_substr):    a = db_conn.collection_names()    b = []    for i in a:        if clct_substr in i:            b.append(i)            db_conn[i].drop()            print("Drop:"+i)'''函數(shù)功能:對Dict按Key排序入?yún)ⅲ篸ic: 數(shù)據(jù)字典'''def SortDictKeys(dic):     return sorted(dic.items(), key=lambda d: d[0])

main.py 導入的主程序,入口方法

#!/usr/bin/env python#coding:utf-8from util_file import UtilFile# from util_thread import MyThreadimport pymongo#************************************************************************#常量定義清單#************************************************************************#文件stck_m1_file_dir = "D:/new_tdx/T0002/export/min1"   #1分鐘線數(shù)據(jù)文件目錄stck_day_file_dir = "D:/new_tdx/T0002/export/day"   #1分鐘線數(shù)據(jù)文件目錄#數(shù)據(jù)庫ip = '127.0.0.1'port = 27017db_name = 'robin'#表名稱bs_m1_pre = 'BS_M1_'#基礎表_1分鐘線數(shù)據(jù)表前綴bs_day_pre = 'BS_DAY_'#基礎表_日線數(shù)據(jù)表前綴#************************************************************************#主程序#************************************************************************conn = pymongo.MongoClient(host=ip, port=port)      #連接Mongodb數(shù)據(jù)庫服務器db = conn[db_name]      #選擇存儲數(shù)據(jù)庫#************************************************************************##從文件中導入數(shù)據(jù)到基礎表#************************************************************************# UtilFile.ImpDataFromFileMultThrd(2, db, stck_m1_file_dir, "M1", bs_m1_pre, 1)     #導入1分鐘線數(shù)據(jù)UtilFile.ImpDataFromFileMultThrd(2, db, stck_day_file_dir, "D", bs_day_pre, 1)      #導入日線數(shù)據(jù)
本站僅提供存儲服務,所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Linux——線程編程
java中對IO文件的操作(讀,寫,增加一行,刪除一行,刪除文件
在Java實現(xiàn)Dos中的文件操作命令功能
TFIDF實現(xiàn)
賜予我力量吧,stata!
創(chuàng)建、刪除、復制文件夾及其子文件
更多類似文章 >>
生活服務
分享 收藏 導長圖 關注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服