irpas技术客

Python截胡修改scrapy-redis适应动态redis_key,自由拼接url!!_鹏神哥哥

大大的周 5139

能看到这篇文章的人想必是有一定了解scrapy的人,但是由于redis_key非动态性以及不符合业务的url拼接的原因,导致scrapy_redis对于某些业务非常不顺手,甚至不适应业务!!但是!!通过截胡修改源码的方式能够使得redis_key动态变化,并且url能够自由拼接~~必须点赞!!

一、咱们先来看看框架的简介

scrapy-redis是scrapy框架基于redis数据库的组件,用于scrapy项目的分布式开发和部署。

有如下特征:

1、分布式爬取

您可以启动多个spider工程,相互之间共享单个redis的requests队列。最适合广泛的多个域名网站的内容爬取。

2、分布式数据处理

爬取到的scrapy的item数据可以推入到redis队列中,这意味着你可以根据需求启动尽可能多的处理程序来共享item的队列,进行item数据持久化处理

3、Scrapy即插即用组件

Scheduler调度器 + Duplication复制 过滤器,Item Pipeline,基本spider

4、scrapy-redis架构

二、修改make_request_from_data方法

由于我的业务使用了很多redis,所以代码会比较多看起来会比较繁荣,但是都是有存在必要的,这个方法主要是为了拼接url以及获取到任务需要的参数

def make_request_from_data(self, data): """ 重写make_request_from_data方法,data是scrapy-redis读取redis中的"'关键词||大词id||小词id'",然后发送get请求 :param data: redis中消息队列数据,是一个string :yield: 一个Request对象   """ # 为了常启动!! if data == 1: return "等待任务" article_page = self.article_page #这里取出他的小词,大词id,小词id small_word = data.split("||")[0] big_id = data.split("||")[1] small_id = data.split("||")[2] smallinfo = [(small_id, big_id, small_word, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))] #这里入一下库 self.cur.executemany(f"insert into {self.sql_dbname}.s_word(id, bid,s_keyword,create_time) values(%s,%s,%s,%s)",smallinfo) # 批量插入 self.conn.commit() for page in range(article_page): # 这里控制无效页数,如果有标记就跳出 info = self.redis_db2.sismember(f"{self.sql_dbname}_None", f"No infos:{str(small_id)}") if info: self.redis_db2.srem(f"{self.sql_dbname}_None", f"No infos:{str(small_id)}") print("删除并break") break url1 = f"https://·/products/{small_word}.html?IndexArea=product_es&page={page}" response1 = scrapy.Request(url=url1,meta={"big_id":big_id,"db_name": self.sql_dbname, "small_id": small_id, "small_word": small_word, "db_id": self.db_id},callback=self.parse) # , dont_filter=True # 把大词ID也带上扔过去 yield response1 三、修改next_requests方法

这里是重点,为了适应动态的redis_key,从而修改的方法,也是本文的重中之重!!大家仔细看注释,一定要有耐心!!

def next_requests(self): """Returns a request to be scheduled or none.""" use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET) fetch_one = self.server.spop if use_set else self.server.lpop # print(fetch_one, use_set, '看看情况') # XXX: Do we need to use a timeout here? found = 0 #这里拿到redis中的key,为redis_key做准备!!! for key in self.redis_db2.keys(): #这是我key的一个判断!! if "_task_info" in key: # 获取到mysql数据库名,之后有用 db_name = key.rsplit('_', 2)[0] # 初始化两个变量 pd, task_info = 0, '' for key1 in self.redis_db1.keys(): if f'{db_name}_task_start' == key1: pd = 1 # 这里做一个判断!!如果任务还没生成完会有问题,进度会不对! if pd == 0: self.sql_dbname = db_name # 这里拿到总数,扔进reids里面去!!进度!! total = self.redis_db2.llen(f"{db_name}_task_start") cc = list(self.redis_db2.smembers(f"{db_name}_task_state")) if total and not cc: totalinfo = {"total": total} self.redis_db2.sadd(f"{db_name}_task_state", json.dumps(totalinfo)) # 获取列表的长度 self.Bigworlen = self.redis_db2.llen(f"{db_name}_task_start") # 拿到集合中任务信息 # task_info = json.loads(self.redis_db2.spop(key)) # 拿到集合中任务信息 task_info = json.loads(list(self.redis_db2.smembers(key))[0]) if task_info: # task_info={"article_engines": "Bing", "article_thread": 1, "article_page": 1} article_engines = task_info["article_engines"] # print(article_engines) if article_engines == "Alibaba": # 拿到页数信息 self.article_page = task_info["article_page"] # 拿到数据库id self.db_id = task_info["db_id"] #这里就是redis_key,经过重重判断!!但是他是动态的!!! self.redis_key = f"{self.sql_dbname}_task_start" self.redis_batch_size = self.Bigworlen # TODO: Use redis pipeline execution. # 这里是常启动的关键,造一个对象! if not self.Bigworlen: req = self.make_request_from_data(1) return req while found < self.redis_batch_size: # 每次读取的量 #利用的动态redis_key data = fetch_one(self.redis_key) # 从redis中读取一条记录 if not data: req = self.make_request_from_data(1) return req # Queue empty. break data = bytes.decode(data) req = self.make_request_from_data(data) # 根据从redis中读取的记录,实例化一个request if req: return req found += 1 else: self.logger.debug("请求不是从数据发出的: %r") if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key) 四、修改start_requests方法

这个方法主要是拿来初始化数据库链接

def start_requests(self): """Returns a batch of start requests from redis.""" # 存一个mysql,私网 self.conn = pymysql.connect(host='r.aliyuncs.com', user='root',password='C', charset='utf8') self.cur = self.conn.cursor() # 取任务2号库~私网 self.redis_db1 = redis.Redis(host='r.aliyuncs.com', port=6379,password='8', db=1, decode_responses=True, encoding="utf-8",errors='ignore') self.redis_db2 = redis.Redis(host='r.aliyuncs.com', port=6379,password='8', db=2, decode_responses=True, encoding="utf-8",errors='ignore') self.sql_dbname, self.Bigworlen, self.article_page, self.db_id = '', 0, '', '' return self.next_requests() 五、然后就可以打包成docker镜像扔k8s了,美滋滋

当然还有不会的朋友可以加群交流,然后点个赞吧,给点动力!!~~


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #自由拼接url