纪懿玲的作业一

代码


# -*- coding: utf-8 -*-
import web_crawling as wc
import find_assignment as fa
import download as dl
import tool
import read_data as rd
import draw_pics as dp
import pandas as pd
import numpy as np

if __name__ == "__main__":
    df_code = fa.find_assignment(tool.get_No(),10)
    code_list = df_code['code_list']
    name_list = df_code['name_list']
    for code in code_list:
        wc.get_table(code)
        print("股票" + code + " completed...")
    
    for code in code_list:
        dl.get_pdf_link(code)
        print("股票" + code + "的年报下载 completed...")

    tool.clean_pdf()
    
    data = rd.read_all_data(df_code)
    seperated_data = tool.seperate_df(df_code,data)
    
    np.save('seperated_data.npy',seperated_data)
    seperated_data = np.load('seperated_data.npy',allow_pickle = 'True').item()
    
    for key in seperated_data.keys():
        dp.draw_pics_twinx(seperated_data.get(key))
        import fitz
import re
import pandas as pd

#判断行业中上市公司个数是否满足大于等于10的要求
def does_match(text):
    if len(re.findall('\n\d{6}\n',text)) < 10:
        return False
    else:
        return True


def find_assignment(No,num): 
    #No为int类型,表示在本次作业中该学生的序号
    #num代表该行业前num家上市公司
        
        #读取所有的内容
    text = ''
    with fitz.open('../1638277734844_11692.pdf') as doc:
        for page in doc:
            text = text + page.get_text('text')
        
    #找到所有行业的两位代码
    locs = re.finditer('\n\d{2}\n',text)
    
    #注意locs里面可能有重复的行业标号,需要筛选一下
    loc_dict = {}
    for loc in locs:
        #loc.span()的结果为左开右闭的区间,去除两端的换行符
        s = loc.span()[0] + 1
        e = loc.span()[1] - 1
        if text[s:e] in loc_dict:
            continue
        else:
            loc_dict[text[s:e]] = [s,e]
    
    #获取有效行业(上市公司>=10),将他们放入列表effective_industries中,

    last_key = '' #保存上一次循环中的key
    last_value = 0 #保存上一次循环中的value[1],即行业代码的索引的后一位置(为换行符)
    effective_industries = []
    for key,value in loc_dict.items():
        
        if key != '01':
            subtext = text[last_value:value[0]]
            if does_match(subtext) == False:
                last_key = key
                last_value = value[1]
                continue
            else:
                effective_industries.append(last_key)   
        #第一个的key必定是01,所以第一次循环直接跳到这一步
        last_key = key
        last_value = value[1]
        
    #注意,最后还剩下一部分没循环到
    subtext = text[last_value:len(text)]
    if does_match(subtext) == True:
        effective_industries.append(key)
    
    #判断No匹配哪个行业
    #可能会出现No超过了有效行业数,故采用模运算
    No = No % len(effective_industries)
    
    my_industry = effective_industries[No-1] 
    
    print('第'+ str(No) + '号的作业是行业 ' + my_industry)
    
    '''
    附加代码,找到自己的行业包含的所有公司代码,以列表返回
    '''

    s = loc_dict.get(my_industry)[1]
    
    #获得下一个键值对,如'01'之后为'02'
    if int(my_industry) < 9:
        nxt = '0' + str(int(my_industry) + 1)
    elif int(my_industry) == 9:
        nxt = '10'
    else:
        nxt = str(int(my_industry) + 1)
    e = loc_dict.get(nxt)[0]
    
    subtext = text[s:e]
    
    code_list = re.findall('\n\d{6}\n',subtext)
    code_list = [code.strip() for code in code_list]
    
    '''
    附加代码,找到自己的行业包含的所有公司名称,以列表返回
    '''
    name_list = []
    
    for code in code_list:
        code_first_loc = subtext.find(code)
        start = code_first_loc + 7
        end = subtext.find('\n', start)
        
        name = subtext[start : end]
        name_list.append(name)
    
    '''
    把公司代码code_list和公司名称name_list合并成dataframe
    '''
    df = {'code_list':code_list[0:num], 'name_list':name_list[0:num]}
    df = pd.DataFrame(df)
    
return df
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import time

import tool

def get_table_sse(code):#code here refers to stock code
    
    browser = webdriver.Chrome()
    url = 'http://www.sse.com.cn/disclosure/listedinfo/regular/'
    browser.get(url)
    time.sleep(3)
    browser.set_window_size(1550, 830)
    browser.find_element(By.ID, "inputCode").click()
    browser.find_element(By.ID, "inputCode").send_keys(code)
    time.sleep(3)
    browser.find_element(By.CSS_SELECTOR, ".sse_outerItem:nth-child(4) .filter-option-inner-inner").click()
    browser.find_element(By.LINK_TEXT, "年报").click()
    # dropdown = browser.find_element(By.CSS_SELECTOR, ".dropup > .selectpicker")
    # dropdown.find_element(By.XPATH, "//option[. = '年报']").click()
    time.sleep(3)#sleep的原因是需要等待浏览器渲染
    
    css_selector = "body > div.container.sse_content > div > div.col-lg-9.col-xxl-10 > div > div.sse_colContent.js_regular > div.table-responsive > table"
    element = browser.find_element(By.CSS_SELECTOR, css_selector)
    table_html = element.get_attribute('innerHTML')
    fname = f'../nianbao/{code}.html'
    with open(fname,'w',encoding='utf-8') as f:
        f.write(table_html)


def get_table_szse(code):
    browser = webdriver.Chrome()
    url = 'http://www.szse.cn/disclosure/listed/fixed/index.html'
    browser.get(url)
    time.sleep(3)
    
    browser.set_window_size(1550, 840)
    browser.find_element(By.ID, "input_code").click()
    browser.find_element(By.ID, "input_code").send_keys(code)
    time.sleep(3)
    
    browser.find_element(By.CSS_SELECTOR, ".active:nth-child(1) > a").click()
    browser.find_element(By.CSS_SELECTOR, "#select_gonggao .c-selectex-btn-text").click()
    time.sleep(3)
    browser.find_element(By.LINK_TEXT, "年度报告").click()
    time.sleep(3)
    
    css_selector = "#disclosure-table > div > div.table-con-outer > div > table"
    element = browser.find_element(By.CSS_SELECTOR, css_selector)
    table_html = element.get_attribute('innerHTML')
    fname = f'../nianbao/{code}.html'
    with open(fname,'w',encoding='utf-8') as f:
        f.write(table_html)


def get_table(code):
    market = tool.which_market(code)
    if market == 'sse':
        get_table_sse(code)
    elif market == 'szse':
        get_table_szse(code)
        import re
import pandas as pd
import os
import tool
import pdfplumber

def is_fin_number(string):
    if string == '':
        return False
    try: 
        string = string.strip()
        string = string.replace(',','')
    
    except: return False
    

    for s in string:
        if s.isdigit() == True or s == '-' or s == '.' or s == ' ' or s == '\n':
            continue
        else:
            return False
    return True

def get_data(row,name_mode):
    rc = re.compile(name_mode,re.DOTALL)

    bound = 0
    for i in range(0,len(row)):
        
        rs = None
        try: 
            rs = rc.search(row[i]) #row[i]可能是None
        except: 
            continue
        
        if rs is None:
            continue
        else:
            bound = i
            break
    
    if rs is None: #意味着没有找到
        return -1
    
    for i in range(bound,len(row)):
        if is_fin_number(row[i]) == True:
            return row[i]
        
    return 'other row' # 说明虽然匹配到了文字,但数据不在当行
    
#该页是否是主要会计数据和财务指标
def is_this_page(text):
    mode = '\n.*?主+要+会+计+数+据+和+财+务+指+标+.*?\n'
    if re.search(mode,text) is None:
        return False
    else: 
        return True


def get_twin_data(fname):
    
    earnings = -1
    
    try: #未知原因, 打开文件时会出现assert error
        with pdfplumber.open('../pdf/' + fname) as pdf:
            
            s = 0
            for i in range(0,len(pdf.pages)):
                text = pdf.pages[i].extract_text()
                if is_this_page(text) == True:
                    s = i
                    break
                else: 
                    continue
            
            page_index = 0
            bound = 0
            for i in range(s,s+2): #deterministic
    
                table = pdf.pages[i].extract_table()
                
                try: len(table)
                except: continue
            
                for j in range(0,len(table)):
                    e = get_data(table[j],'.*?营业收入.*?')

                    #此时文字和数据错行,需要继续往上搜索
                    if e == 'other row':
                        for k in range(j-1, 0,-1):
                            for h in range(0,len(table[k])):
                                if is_fin_number(table[k][h]) == True:
                                    e = table[k][h]
                                    break
                                else: 
                                    continue
                            else:
                                if is_fin_number(e) == True:
                                    break
                                
                    if e != -1:
                        earnings = e
                        bound = j
                        break
                    else:
                        continue
                
                if earnings == -1:
                    continue
                
                page_index = i
                break
            
            #循环结束仍然没有获得营业收入
            if earnings == 0:
                return None
            
            net_income = -1
            for i in range(page_index,page_index + 2):
                table = pdf.pages[i].extract_table()
                
                try: len(table)
                except: continue     
                
                ni_mode = '.*?归属于.*?(所有者|股东)?的?.?净?.?利?.?润?.*?'
                if i == page_index: #说明此时还没有换页
                    for j in range(bound + 1,len(table)):
                        ni = get_data(table[j], ni_mode)
                        
                        #此时文字和数据错行,需要继续往下搜索
                        if ni == 'other row':
                            for k in range(j, len(table)):
                                for h in range(0,len(table[k])):
                                    if is_fin_number(table[k][h]) == True:
                                        net_income = table[k][h]
                                        return [earnings,net_income]
                                    else: 
                                        continue
                                    
                        if ni == 'other row':
                            return 'data is at the next page'
                        elif ni != -1:
                            net_income = ni
                            break
                        else:
                            continue
                else: #此时换页
                    for j in range(0,len(table)):
                        ni = get_data(table[j], ni_mode)
                        if ni != -1:
                            net_income = ni
                            break
                        else:
                            continue
                    
                if net_income == -1: continue
                else: return [earnings,net_income]
                
    except: print(fname+'出现AssertionError')
'''
import read_data           
read_data.get_twin_data('../pdf/晨鸣纸业:2012年年度报告.PDF')
'''
#该函数需要在pdf目录下查找对应的文件名
def read_all_data(df):
    #df为包含两列(code_list和name_list)的dataframe

    filename_list = []
    year_list = []
    data_list = []
    for index,row in df.iterrows():
        for filepath,dirnames,filenames in os.walk('../pdf'):
            for filename in filenames:
                #print(filename)
                if (row['name_list'] in filename) or (row['code_list'] in filename):
                    print(filename)
                    data = get_twin_data(filename)
                    
                    if data is not None:
                        filename_list.append(filename)
                        year_list.append(tool.get_year(filename,row['code_list']))
                        data_list.append(get_twin_data(filename))
                        print(filename + ' completed')
    rt_list,ni_list = zip(*data_list)
    df_data = {'filename':filename_list,'year':year_list,
               '营业收入':rt_list,'净利润':ni_list}
    df_data = pd.DataFrame(df_data)    
    return df_data
    import os
import re
import pandas as pd

def get_No():
    print('程序开始前的注意事项:\n')
    print("1.请先下载好相关模块,并在网络通畅的情况下运行本程序;\n")
    print("2.本程序的爬虫部分使用的是chrome浏览器,请下载对应的webdriver;\n")
    print('3.下载时间较长,请耐心等待;\n')
    print('4.由于爬取深交所的代码的下载路径是绝对路径,请先创建如下目录路径:')
    print("D:\\CS\\py_fi\\scores_3\\nianbao\\src\\pdf\n")
    print('\n\n\n--------程序开始--------\n\n\n')
    return int(input("请输入你的序号:"))

def to_wan(num):
    return num/10000
def to_yi(num):
    return num/100000000

def is_year(string):
    if len(string) == 4:
        return True
    else:
        return False
def to_num(string):
    if type(string) == type('str'):
        string = string.replace(',','')
        string = string.replace('\n','')
        return float(format(to_yi(float(string)), '.3f'))
    else:
        return string
def to_year_list(str_list):
    for i in range(0,len(str_list)):
        str_list[i] = str(str_list[i])

def to_num_list(str_list):
    for i in range(0,len(str_list)):
        str_list[i] = to_num(str_list[i])

def which_market(code):
    if code[0:2] == '60' or code[0:3] == '688' or code[0:3] == '900':
        return 'sse'
    elif code[0:2] == '00' or code[0:3] == '200' or code[0:2] == '30':
        return 'szse'

def clean_pdf():
    for filepath,dirnames,filenames in os.walk('../pdf'):
        for filename in filenames:
            if '取消' in filename:
                os.remove('../pdf/'+ filename)
                print(filename + '+ deleted')

def get_year_sse(fname):
    year = re.search('\d{6}_(\d{4}).*?\.pdf',fname,re.IGNORECASE)
    return year.group(1)

def get_year_szse(fname):
    year = re.search('.*?(\d{4}).*?\.pdf',fname,re.IGNORECASE)
    return year.group(1)

def get_year(fname,code):
    m = which_market(code)
    if m == 'sse':
        return get_year_sse(fname)
    elif m == 'szse':
        return get_year_szse(fname)

def be_contigious(this_data):
    #对于某个公司的绘图数据,我们只取从最近时间到最远时间的连续数据
    length = len(this_data)
    last = int(this_data['year'][length - 1])
    for i in range(length-2, -1, -1):
         nxt = int(this_data['year'][i])
         if last - nxt != 1:#说明不连续
             return this_data.loc[i+1 : length]
         else:
             continue
    return this_data
def seperate_df(df_code,data):
    seperated_data = {}
    
    for j,row1 in df_code.iterrows():
        name = row1['name_list']
        code = row1['code_list']
        this_data = pd.DataFrame(columns = ['year','营业收入','净利润'])
        for i,row2 in data.iterrows():
            fn = row2['filename']
            if name in fn or code in fn:
                data_dict = {'name': name,
                             'year': row2['year'],
                             '营业收入':row2['营业收入'],
                             '净利润':row2['净利润']}
                this_data = this_data.append(data_dict,ignore_index=True)
                be_contigious(this_data)
        seperated_data[code] = this_data
return seperated_data
import matplotlib.pyplot as plt
import numpy as np
import tool

def draw_pics_twinx(df):
    
    plt.rcParams['figure.dpi'] = 200
    plt.rcParams['axes.unicode_minus']=False #用来正常显示负号
    plt.rcParams['font.sans-serif'] = ['SimHei'] # 使图片显示中文
    
    
    x = df['year']
    tool.to_year_list(x)
    y_rt = df['营业收入']
    tool.to_num_list(y_rt)
    y_ni = df['净利润']
    tool.to_num_list(y_ni)
    
    fig = plt.figure()
    
    
    ax1 = fig.subplots()
    ax1.plot(x, y_rt,'steelblue',label="营业收入",linestyle='-',linewidth=2,
             marker='o',markeredgecolor='pink',markersize='2',markeredgewidth=2)
    
    ax1.set_xlabel('年份')
    ax1.set_ylabel('营业收入(单位:亿元)')
    for i in range(len(x)):
        plt.text(x[i],y_rt[i],(y_rt[i]),fontsize = '10')
    ax1.legend(loc = 6) 

    ax2 = ax1.twinx()
    ax2.plot(x, y_ni, 'orange',label = "归属于股东的净利润",linestyle='-',linewidth=2,
             marker='o',markeredgecolor='teal',markersize='2',markeredgewidth=2)
    
    ax2.set_ylabel('归属于股东的净利润(单位:亿元)')
    for i in range(len(x)):
        plt.text(x[i],y_ni[i],(y_ni[i]),fontsize = '10')
    ax2.legend(loc = 2)
    
    '''
    title部分必须放最后,否则会出现左边的y轴有重复刻度的问题
    '''
    title = df['name'][0] + '历年' + '财务数据'
    plt.title(title)
    
    plt.savefig('../pics/' + title + '.png')
    plt.show()


结果

结果截图1 结果截图2 结果截图3 结果截图4 结果截图4 结果截图5 结果截图1 结果截图2 结果截图3 结果截图4 结果截图5 结果截图6 结果截图7 结果截图8 结果截图9 结果截图10

解释

一.行业分析

1.营业收入解读

根据营业收入随时间变化的趋势图(2013-2022),目前医药制造业第一梯队公司是云南白药、东北制药、东阿阿胶和丰原药业。

(1)云南白药集团股份有限公司,前身为成立于1971年6月的云南白药厂。1993年5月3日经云南省经济体制改革委员会云体(1993)48号文批准,云南白药厂进行现代企业制度改革,成立云南白药实业股份有限公司。公司于1993年11月首次向社会公众发行股票2,000万股(含20万内部职工股),定向发行400万股,发行价格3.38元/股,发行后总股本8,000万股。经中国证券监督管理委员会批准,1993年12月15日公司社会公众股(A股)在深圳证券交易所上市交易,内部职工股于1994年7月11日上市交易。1996年10月经临时股东大会会议讨论,公司更名为云南白药集团股份有限公司。

经过30多年的发展,公司已从一个资产不足300万元的生产企业成长为一个总资产76.3亿多,。截止到2022年,云南白药的的营业收入为364.884亿元人民币,10年间增长了125%。主要有以下三个原因: ①云南白药开展不同版块的产品,致使扩大消费群体 ②品牌影响力,提到云南白药大家都会想到药用价值,所以很多产品会因为品牌效应去购买 ③疫情下的药物需求增长,云南白药并未受到冲击。

(2)东北制药集团股份有限公司(简称“东北制药”),是辽宁方大集团实业有限公司(简称“方大集团”)旗下上市公司。公司前身为东北制药总厂,始建于1946年。东北制药拥有化学原料药、化学制剂、医药商业、医药工程、生物医药等主要业务板块,覆盖医药研发、制造、分销全产业链条,员工队伍7000余人,总资产120亿元。拥有原料药、制剂两大生产基地。并全面向下游东北制药制剂产业链延伸,形成难以复制的综合竞争优势。所以东北制药可以在2020年营业收入降到73.844亿元的情况下顶住压力,到2022年时营业收入升至历史最高88.5089亿元。

(3)东阿阿胶是一家著名的阿胶制作公司,并列为山东非物质保护遗产代表项目。现拥有旗舰店及直营店共计75家,其在2018营业收入达到73.394亿元,但2019年随之而来下降到24.589亿元。截止到2022年营业收入为40.418亿元,。主要从事医药类、保健类产品的体验式服务。店及直

(4)安徽丰原药业股份有限公司是丰原集团医药板块的上市公司,是安徽省第一家医药类上市公司,集医药研发、生产、销售于一体,属高新技术企业、国家认定企业技术中心、中国医药工业百强企业。现有53条通过GMP认证的生产线,产品涉及生物制药、化学制药、中成药、中药饮片、原料药五大领域,涵盖解热镇痛、妇儿、神经系统、心血管系统、泌尿系统、营养类、抗生素类等7大类、10余个剂型、300多个品种,拥有多个一、二类新药品种和多项产品自主知识产权,连锁药房全省近500家。所以丰原药业稳步提升,2022年营业收入为40.04亿元。

医药制造业的第二梯队有派林生物和海南海药。派斯双林生物制药股份有限公司属医药生产公司。主要经营单采血浆的生产采集、储存和销售;血液收购,血液生物制品及保健康复制品的研究、开发、生产销售;药膏、医用生物材料、医疗器械的开发、销售等。海南海药股份有限公司(简称海药)创立于1965年,前身为海口市制药厂。1992年改制为股份公司,1994年在深圳证券交易所上市(股票代码:000566)。海南海药已拥有130个药品批准文号,常年在产产品品规大概六十余个,产品涵盖抗生素类、消化类、抗病毒类、抗肿瘤类、免疫调节类、心脑血管类等领域,已有数个优势品种药品进入了省、国家医保目录。这两个公司都是以研究生物制药为主的公司。派林生物在过去的十年间一直稳步增长,2022年已增长至24.052亿元。而海南海药则是起起伏伏,2018年最高点为24.718亿元,到2022年下降至17.791亿元。

第三梯队的公司有万泽股份、启迪药业、四环生物和丽珠集团。这几家公司,公司2022年的营业收入都处于10亿元以下,都是2013年以前就已经上市的公司,但是近十年发展速度缓慢。

根据行业内营业收入横向对比图,我们可以更加直观地看到云南白药的营业收入始终处于行业领先地位,大体量领先其他的公司。而后的东北制药也不甘落后,东阿阿胶和丰原药业也是不相上下。