周航宇的实验报告

代码

import re
  from selenium import webdriver
  from selenium.webdriver.common.by import By
  from selenium.webdriver.common.keys import Keys
  from bs4 import BeautifulSoup
  from lxml import etree
  import fitz
  import re
  import pandas as pd
  import time
  import requests





  href = 'http://www.csrc.gov.cn/csrc/c100103/c1558619/1558619/files/1638277734844_11692.pdf'
  r = requests.get(href,allow_redirects=True)
  f = open('2021年3季度上市公司行业分类结果.pdf','wb')
  f.write(r.content)
  f.close()
  r.close()

   #获取行业分类结果PDF文件,并选出在31类行业中所有的上市公司
  doc = fitz.open('2021年3季度上市公司行业分类结果.pdf')
  page = doc[71]
  toc_txt = page.get_text()

  p1 = re.compile(r'(?<=\n)(\d{6})\n(\*?\w+)*(?=\n)')

  txt = p1.findall(toc_txt)

  firm = txt[25:41]
  firm1=dict(firm)
  id=list(firm1.keys())






  browser = webdriver.Edge(executable_path='C:\\Users\\Lenovo\\Downloads\\edgedriver_win64\\MicrosoftEdge.exe')
  browser.get('https://www.szse.cn/disclosure/listed/fixed/index.html')

  y_start = browser.find_element(By.CLASS_NAME,'input-left')
  y_start.send_keys('2012' + Keys.RETURN)
  y_end = browser.find_element(By.CLASS_NAME,'input-right')
  y_end.send_keys('2022-05-01' + Keys.RETURN)

  i=0
  while i<9:
        element = browser.find_element(By.ID,'input_code')
        browser.find_element(By.LINK_TEXT,'请选择公告类别').click()
        browser.find_element(By.LINK_TEXT,'年度报告').click()
        element = browser.find_element(By.ID,'input_code')
        element.send_keys(id[i] )
        time.sleep(2)
        element.send_keys(Keys.RETURN)
        element = browser.find_element(By.ID,"disclosure-table")
        time.sleep(2)
        innerHTML = element.get_attribute("innerHTML")
        time.sleep(2)
        f = open("年报链接.html",'a',encoding='utf-8')
        f.write(innerHTML)
        time.sleep(2)
        f.close()
        browser.find_element(By.CSS_SELECTOR, ".btn-clearall").click()
        i=i+1

  browser.quit()

  i=9
  while i<16:
      browser = webdriver.Edge(executable_path='C:\\Users\\Lenovo\\Downloads\\edgedriver_win64\\MicrosoftEdge.exe')
      browser.get('http://www.sse.com.cn/disclosure/listedinfo/regular/')
      browser.find_element(By.ID, "inputCode").click()
      element = browser.find_element(By.ID, "inputCode").send_keys(id[i])
      time.sleep(2)
      browser.find_element(By.CSS_SELECTOR, ".sse_outerItem:nth-child(4) .filter-option-inner-inner").click()
      time.sleep(2)
      browser.find_element(By.LINK_TEXT, "年报").click()
      time.sleep(2)
      # dropdown = browser.find_element(By.CSS_SELECTOR, ".show > .selectpicker")
      # dropdown.find_element(By.XPATH, "//option[. = '年报']").click()
      time.sleep(2)
      element = browser.find_element(By.CLASS_NAME, 'common_con')
      time.sleep(2)
      innerHTML = element.get_attribute("innerHTML")
      time.sleep(2)
      f = open("年报链接.html",'a',encoding='utf-8')
      f.write(innerHTML)
      time.sleep(2)
      browser.quit()
      i=i+1


    # 解析爬取网页的源代码,获取相关信息及年报下载链接

  def to_pretty(fhtml):
      f = open(fhtml,encoding='utf-8')
      html = f.read()
      f.close()

      soup = BeautifulSoup(html)
      html_prettified = soup.prettify()

      f = open(fhtml[0:-5]+'-prettified.html', 'w', encoding='utf-8')
      f.write(html_prettified)
      f.close()
      return(html_prettified)


  html = to_pretty('年报链接.html')


  #html1=etree.HTML(html)
  #result=html1.xpath('//td[@class="pull-left ellipsis title-icon" title="点击下载公告文件"]')      不太会

  def txt_to_df(html):
      # html table text to DataFrame
      p = re.compile('(.*?)', re.DOTALL)
      trs = p.findall(html)

      p2 = re.compile('(.*?)', re.DOTALL)
      tds1 = [p2.findall(tr) for tr in trs[1:]]

      tds = list(filter(None, tds1))
      df = pd.DataFrame({'证券代码': [td[0] for td in tds],
                         '简称': [td[1] for td in tds],
                         '公告标题': [td[2] for td in tds],
                         '公告时间': [td[3] for td in tds]})
      return(df)

  df_txt = txt_to_df(html)

  p_a = re.compile('(.*?)', re.DOTALL)
  p_span = re.compile('(.*?)', re.DOTALL)

  get_code = lambda txt: p_a.search(txt).group(1).strip()
  get_time = lambda txt: p_span.search(txt).group(1).strip()

  def get_link(txt):
      p_txt = '(.*?)'
      p = re.compile(p_txt, re.DOTALL)
      matchObj = p.search(txt)
      attachpath = matchObj.group(1).strip()
      href       = matchObj.group(2).strip()
      title      = matchObj.group(3).strip()
      return([attachpath, href, title])

  def get_data(df_txt):
      prefix = 'https://disc.szse.cn/download'
      prefix_href = 'https://www.szse.cn/'
      df = df_txt
      codes = [get_code(td) for td in df['证券代码']]
      short_names = [get_code(td) for td in df['简称']]
      ahts = [get_link(td) for td in df['公告标题']]
      times = [get_time(td) for td in df['公告时间']]
      df = pd.DataFrame({'证券代码': codes,
                         '简称': short_names,
                         '公告标题': [aht[2] for aht in ahts],
                         'attachpath': [prefix + aht[0] for aht in ahts],
                         'href': [prefix_href + aht[1] for aht in ahts],
                         '公告时间': times
          })
      return(df)

  df = get_data(df_txt)

  #下载年报
   #过滤年报摘要与已取消的年报
  def tidy(df):
        d = []
        for index, row in df.iterrows():
            title = row[2]
            a = re.search("摘要|取消", title)
            if a != None:
                d.append(index)
            name=row[1]
        df1 = df.drop(d).reset_index(drop = True)
        return df1

  df = tidy(df)

  df_1 = df[df['公告标题'].str.endswith('年度报告')]
  df_2 = df[df['公告标题'].str.endswith('年报')]
  df_3 = df[df['公告标题'].str.endswith('年报(修订版)')]
  df_4 = df[df['公告标题'].str.endswith('年度报告(修订)')]
  df_5 = df[df['公告标题'].str.endswith('年度报告-更正后')]
  df_6 = df[df['公告标题'].str.endswith('年度报告(更正稿)')]
  df_7 = df[df['公告标题'].str.endswith('年度报告(修订版) ')]
  df_8 = df[df['公告标题'].str.endswith('年度报告(更正后) ')]
  df_9 = df[df['公告标题'].str.endswith('年度报告(更正修订) ')]
  df_10 = df[df['公告标题'].str.endswith('年度报告(更正后) ')]

  df_temporary1 = df[-df['公告标题'].isin(df_1['公告标题'])]
  df_temporary10 = df_temporary9[-df_temporary9['公告标题'].isin(df_10['公告标题'])]
  df_final = df[-df['公告标题'].isin(df_temporary10['公告标题'])]

  df_final = df_final.reset_index(drop=True)
  df_final = df_final.replace('-',None)

    #下载年报
  import requests
  for i in range (0,85):
      r = requests.get(df['attachpath'][i], allow_redirects=True)
      time.sleep(2)
      f = open(df['证券代码'][i]+df['公告标题'][i]+'.pdf', 'wb')
      f.write(r.content)
      f.close()
      r.close()


结果

结果截图 转换成字典 解释

看了好多同学的,学的不太好,都复原不了,真的尽力了