素有数据挖掘领域“世界杯”之称的KDD Cup正在火热进行中,百度作为此次大赛的主办方,除了提供10,000美金特别奖,还为使用PaddlePaddle的参赛选手精心提供了KDD Cup Regular ML Track基线支持,此基线能够在Linux和单机CPU/GPU上运行,通过使用基线,参赛队伍可以更方便地进行特征工程和网络的优化,高效完成训练,并获得更好的结果。
为了让选手更方便地使用此基线,我们将为大家剖析KDD Cup Regular ML Track基线的技术特点和使用过程。
本文对应代码地址:
https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/Paddle_baseline_KDD2019
阅读本文请参考GitHub代码库。
数据预处理
问题定义:
通过给定的用户query记录,展示记录,用户对出行方式点击行为等原始数据预测用户选择的出行方式(transportmode 1 - transport mode 11)。注意有一部分记录(plans)没有返回点击行为,题目要求没有点击同样需要预测。我们将问题重新定义为二分类问题。对每一个transport mode的点击行为都给出一个0到1的概率值(包括上述未点击行为,强行定义为transport mode 0),进而表示为对给定一种mode的ctr预估问题。
特征预处理:
特征预处理的作用是将原始数据转换成一条一条可以用作训练模型的实例(instances)。我们采用json格式保存instances,其中一条instance的json示例:
{ "mode_min_distance": 1, "pid": "", "weather": { "min_temp": "10", "wea": "q", "wind": "12", "max_temp": "25" }, "plan_rank": 1, "query": { "hour": "09", "weekday": "3", "d2": 40.0, "o2": 39.92, "o1": 116.43, "d1": 116.57 }, "eta_rank": 3, "label": 0, "mode_min_eta": 3, "price_rank": 3, "profile": [0], "mode_rank1": 11, "mode_rank2": 3, "mode_rank3": 4, "mode_rank4": 7, "mode_rank5": 1, "mode_max_eta": 1, "plan": { "distance": 0.13730889964270376, "price": 0.00760043431053203, "eta": 0.07021413598936856, "transport_mode": 11 }, "mode_max_distance": 7, "whole_rank": 5, "mode_max_price": 4, "mode_min_price": -1, "distance_rank": 4 }
注意每一条instance通过json直接解析成Python的dict结构(本文例子中有dict嵌套结构),我们可以自定义生成自己的特征和label供模型训练或者预测,dict中的键为特征名,值为特征值。
代码:preprocess_dense.py/preprocess.py 为训练样本生成代码, pre_test_dense.py/pre_process_test.py 为测试样本生成代码。带有dense后缀的为生成dense特征和sparse特征的处理脚本,不带后缀的为全sparse特征的处理脚本。具体选用哪种生成策略或者结合两种生成策略可自定义处理,这里仅提供大致思路,特征工程方面的拓展工作在此框架下比较方便。本文后续实现模型将以同时生成dense和sparse特征为例,详情见preprocess_dense.py。
组建模型
本文采用模型参考DeepFM论文(https://arxiv.org/abs/1703.04247),参见network_confv6, 其他基于fm&deep的更加花哨的模型组建请参考networks文件夹下其他文件,读者也可以开脑洞自定义实现,注意使用其他模型的时候需要对应修改instance喂入模型的格式。
训练模型
定义instance喂入模型格式
本文使用PaddlePaddle Dataset接口。Dataset是一个工厂类,有如下两种:
(1)QueueDataset:每轮训练从指定的文件列表里读取数据。
(2)InMemoryDataset:用户可以调用LoadIntoMemory把数据读到内存。每轮训练从内存中读取数据。
用户创建Dataset时,默认是QueueDataset(具体使用查看文档)。参见map_reader.py中实现MapDataset示例, 在用户实现的dataset类中,用户可以根据自己的特征实现任意的处理逻辑,在generate_sample()函数中定义每个instance输入到模型的格式,注意在示例中,一共通过feature_name定义了 "dense_feature", "context" + str(idx), "context_fm","label" 这四类输入格式。在local_train.py中使用dataset的示例:
dense_feature = fluid.layers.data( name="dense_feature",shape=[DIM_DENSE_FEATURE], dtype='float32') context_feature = [ fluid.layers.data(name="context"+ str(i), shape=[1], lod_level=1, dtype="int64") for i in range(0,NUM_CONTEXT_FEATURE)] context_feature_fm = fluid.layers.data( name="context_fm",shape=[1], dtype='int64', lod_level=1) label = fluid.layers.data(name='label', shape=[1],dtype='int64') print("ready to network") loss, auc_var, batch_auc_var, accuracy, predict =ctr_deepfm_dataset(dense_feature, context_feature, context_feature_fm, label, args.embedding_size,args.sparse_feature_dim) print("ready to optimize") optimizer = fluid.optimizer.SGD(learning_rate=1e-4) optimizer.minimize(loss) exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var([dense_feature] + context_feature +[context_feature_fm] + [label]) pipe_command = PYTHON_PATH + " map_reader.py%d" % args.sparse_feature_dim dataset.set_pipe_command(pipe_command) dataset.set_batch_size(args.batch_size) thread_num = 1 dataset.set_thread(thread_num)
注意set_pipe_command(pipe_command), pipe command是对原始的数据做处理,生成PaddlePaddle可以识别的格式。Dataset读取的每一行原始数据,都会用这里的命令做处理。可以是一个执行python脚本或者二进制等任意Linux命令,用户可以写自己的逻辑。pipe command生成var的顺序需要和set_user_var保持一致。
接下来就需要在pipe_command中处理数据。
在Pipe Command中处理数据
PaddlePaddle中提供了基类paddle.fluid.incubate.data_generator.MultiSlotDataGenerator,用户可以继承并实现自己的处理数据的逻辑。生成的数据需要与set_use_var中设置的顺序保持一致。具体代码参考map_reader.py中自定义处理原始数据代码,用户需实现generate_sample()函数来处理每一行数据:
def _process_line(self, line): instance = json.loads(line) """ profile =instance["profile"] len_profile = len(profile) if len_profile >= 10: user_profile_feature= profile[0:10] else: profile.extend([0]*(10-len_profile)) user_profile_feature= profile if len(profile) > 1 or (len(profile)== 1 and profile[0] != 0): for p inprofile: ifp >= 1 and p <= 65: user_profile_feature[p- 1] = 1 """ context_feature = [] context_feature_fm = [] dense_feature = [0] *self.dense_length plan = instance["plan"] for i, val inenumerate(self.dense_feature_list): dense_feature[i]= plan[val] if (instance["pid"] ==""): instance["pid"]= 0 query =instance["query"] weather_dic =instance["weather"] for fea in self.pid_list: context_feature.append([hash(fea+ str(instance[fea])) % self.hash_dim]) context_feature_fm.append(hash(fea+ str(instance[fea])) % self.hash_dim) for fea inself.query_feature_list: context_feature.append([hash(fea+ str(query[fea])) % self.hash_dim]) context_feature_fm.append(hash(fea+ str(query[fea])) % self.hash_dim) for fea inself.plan_feature_list: context_feature.append([hash(fea+ str(plan[fea])) % self.hash_dim]) context_feature_fm.append(hash(fea+ str(plan[fea])) % self.hash_dim) for fea inself.rank_feature_list: context_feature.append([hash(fea+ str(instance[fea])) % self.hash_dim]) context_feature_fm.append(hash(fea+ str(instance[fea])) % self.hash_dim) for fea inself.rank_whole_pic_list: context_feature.append([hash(fea+ str(instance[fea])) % self.hash_dim]) context_feature_fm.append(hash(fea+ str(instance[fea])) % self.hash_dim) for fea in self.weather_feature_list: context_feature.append([hash(fea+ str(weather_dic[fea])) % self.hash_dim]) context_feature_fm.append(hash(fea+ str(weather_dic[fea])) % self.hash_dim) label =[int(instance["label"])] return dense_feature, context_feature,context_feature_fm, label def generate_sample(self, line): def data_iter(): dense_feature,sparse_feature, sparse_feature_fm, label = self._process_line(line) #feature_name= ["user_profile"] feature_name= [] feature_name.append("dense_feature") for idxin self.categorical_range_: feature_name.append("context"+ str(idx)) feature_name.append("context_fm") feature_name.append("label") yieldzip(feature_name, [dense_feature] + sparse_feature + [sparse_feature_fm] +[label]) return data_iter
至此,模型定义,数据格式处理逻辑,训练流程都已确定,执行Python local_trian.py就可以开始训练了。