【PA交易】BackTrader(二): 同时使用tick和K线数据

news/2024/6/30 4:16:36 标签: python, BackTrader, 数据源, tick

前言

本文是BackTrader数据源系列的中篇。

文内会省略大量上篇文章的代码内容,直接阅读会产生轻微困惑。阅读前请务必完整理解并完成了Quickstart Guide - Backtrader 并阅读和实操过本系列第一篇

【PA交易】BackTrader(一): 如何使用实时tick数据和蜡烛图-CSDN博客

回顾

上一篇文章介绍了背景和需求,同时展示了如何自定义数据源并将tick数据读取到策略中使用。

现在为了看起来更贴近实盘交易,我们将上一篇中的MyDataFeedStatic简单修改一下,修改为MyDataFeedDynamic,该类的整体实现如下:

python">
TICK_DATA_COLUMNS = ('price', 'vol', 'amount', 'ccl', 'bid', 'bidVol', 'ask', 'askVol')


class MyDataFeedDynamic(bt.feed.DataBase):
    lines = TICK_DATA_COLUMNS

    def __init__(self, data_reader: ABCDataReader):
        super(MyDataFeedDynamic, self).__init__()
        self._data_reader = data_reader

    def islive(self):
        return True

    def _load(self):
        tick = self._data_reader.read_tick()
        if tick is None:
            # exhausted all rows
            return False

        for datafield in self.getlinealiases():
            if datafield in TICK_DATA_COLUMNS:
                line = getattr(self.lines, datafield)
                line[0] = tick[datafield]

        # -------------------------------------------
        # 添加日期时间
        self.lines.datetime[0] = date2num(tick['tickdt'])

        return True

对于这个类的实现有任何困惑之处可以参考上一篇文章。相比于上篇文章中的MyDataFeedStatic,这个新的MyDataFeedDynamic有一个最大不同是使用了一个MyDataReader接口。这个接口定义了一个read_tick方法:

python">from abc import ABC, abstractmethod


class ABCDataReader(ABC):
    @abstractmethod
    def read_tick(self):
        raise NotImplemented()

这可以模拟我们分段从CSV、数据库、甚至CTP读取数据的连续过程。相信对于数据量巨大的tick数据源尤其有用。
处于演示目的,ABCDataReader的实现可以很简单,我们直接将上一篇的测试CSV读入内存,之后使用一个行指针顺序读取即可,这和MyDataFeedStatic的实现完全相同。

为方便下文叙述,将这个ABCDataReader的实现命名为MyDataReader。

合并tick为bar

下面我们开始这篇文章的主要内容。

BackTradertick转换为K线实际提供了Data Feeds - Resample - Backtrader,关于这个API,在本系列的下篇会主要介绍。但是考虑到CTP的tick不稳定性,以及我们希望实现更加灵活,我们的选择是自行合并tick为最基础的分钟线。之后在需要更大周期数据时,再去使用Resample API(见同系列下一篇)。

首先,这里给出一个简单的合并K线函数:

python">def merge_bar_from_tick(self, tick, cur_bar, period) -> {}:
	"""
	简化的合并 tick 为 K bar 的逻辑
	这个方法忽略了绝大多数的数据异常, 仅仅是为了演示目的
	"""
	
	def make_k_bar(tick, cur_bar):
		if cur_bar is None:
			return {
				"datetime": tick['tickdt'],
				'close': tick['price'],
				'low': tick['price'],
				'high': tick['price'],
				'open': tick['price'],
				'volume': tick['vol'],  # 仅仅是为了演示目的, 实际示例数据中可能并非如此
				'openinterest': tick['ccl'],
	
			}
		else:
			return {
				"datetime": cur_bar['datetime'],
				'close': tick['price'],
				'low': min(cur_bar['low'], tick['price']),
				'high': max(cur_bar['high'], tick['price']),
				'open': cur_bar['open'],
				'volume': tick['vol'],  # 仅仅是为了演示目的, 实际示例数据中可能并非如此
				'openinterest': tick['ccl'],
			}
	
	cur_bar = make_k_bar(tick, cur_bar)
	
	# 根据周期检查当前bar是否已经构成了一个完整的bar
	tick_ts = tick['tickdt'].timestamp()
	bar_ts = cur_bar["datetime"].timestamp()
	if tick_ts - bar_ts >= period * 60:
		# 当前bar已经完成,
		return {
			"ready_bar": cur_bar,
			"cur_bar": None
		}
	else:
		# 当前bar尚未完成,
		return {
			"ready_bar": cur_bar,
			"cur_bar": cur_bar
		}
	

之后我们为MyDataReader添加一些实现,首先是构造函数,改动后接收一个额外的bars_period,用于指定要合并的K bar分钟周期,

python">

class MyDataReader(ABCDataReader):
    def __init__(self, df, bar_period = 1):
        # ...

        # 当前tick所述的Kbar
        self._cur_bar = None
        # 要合并到的bar周期
        self._bar_period = bar_period
        # 已经合并完成的bar
        self._ready_bar = None

接下来,在每次读取tick的时候都额外调用一次merge_bar_from_tick方法。

python">def read_tick(self):
    # ...

    tick = # 读取 tick, 略   

    # ...

    
    result = self.merge_bar_from_tick(tick, self._cur_bar, self._bar_period)
    self._cur_bar = result['cur_bar']
    self._ready_bar = result['ready_bar']

当self._cur_bar为空时, 表明已经合成了一个新的bar。否则表示没有合成新的bar。

MyDataReader还需要提供一个获取bar的接口,从前面的代码可以看出,self._ready_bar是真正要返回的K bar。所以我们可以直接将其返回:

python">def read_bar(self, index):
    if self._ready_bars is None:
        return None
     return self._ready_bars[index]

实时bar VS 完整Bar

方法merge_bar_from_tick中每次都会将tick无条件的更新到传入的cur_bar中,如果没有传入,则新建,否则将更新他。而对于返回值:

  • cur_bar: 可能为空或者当前K线
  • ready_bar: 总是返回一个有效的K线

我们根据交易策略算法不同可能需要不同的类型,当需要实时bar的形态时,可以总是将最新的bar进行更新并且返回;如果每次都要求根据一个完整的bar去做决策,则可以返回最终已经完成的bar。

这也是之所以选择自己实现这段合bar逻辑的原因之一:更加灵活可控。

关于产品化

示例的MyDataReader逻辑极其简单,这对于初学者可能有帮助,但是距离产品化有很大距离。在实践中,作为分享,我的一些操作包括:

  • 使用队列或者一些漏斗算法进行流控
  • 对空数据(无交易或者丢包)的情况进行额外向前填充ffill
  • 对于乱序tick进行一些基于hash的自动填充等等。

同时从编程角度,协程间的协作也是很重要的点。(没有接触过协程的朋友可以观看我这篇文章:Python:浅谈迭代器、生成器与协程的演化路径-CSDN博客)

数据源MyDataFeedWithBar

前面我们给出了一个只有tick数据源,现在我们考虑同时携带tick和分钟线的数据源

因为数据模块MyDataReader已经承担了大多数工作,所以数据源类的工作并不需要太多。回顾系列文章第一篇中我提到的架构图:

基础设计架构icon-default.png?t=N7T8https://blog.csdn.net/josephus_mu/article/details/139833207MyDataFeed更多仅仅是充当一个adapter的功能,而多数数据实际来源工作应该在数据模块中完成

大多数功能的改动集中于_load方法的内部:

python">
class MyDataFeedWithBar(bt.feed.DataBase):
    lines = (('tickdt', 'price', 'vol', 'amount', 'ccl', 'bid', 'bidVol', 'ask', 'askVol'))
    #...

    def _load(self):
        tick = self._data_reader.read_tick()
        if tick is None:
            # exhausted all rows
            return False

        bar = self._data_reader.read_bar(self.__bar_index)

        for datafield in self.getlinealiases():
            if datafield == 'tickdt':
                continue
            if datafield in TICK_DATA_COLUMNS:
                line = getattr(self.lines, datafield)
                line[0] = tick[datafield]
            if datafield in BAR_DATA_COLUMNS:
                line = getattr(self.lines, datafield)
                line[0] = bar[datafield]

        # -------------------------------------------
        # 添加日期时间
        self.lines.tickdt[0] = date2num(tick['tickdt'])
        self.lines.datetime[0] = date2num(bar['datetime'])

        return True

这里依然是遍历数据源的Lines,根据Lines的别名找到tick或bars中的名称,之后根据这些名词将其绑定到Lines中。如上篇所述, OHLC类中已经默认定义了一些基础管线:

python">class OHLC(DataSeries):
    lines = ('close', 'low', 'high', 'open', 'volume', 'openinterest',)


class OHLCDateTime(OHLC):
    lines = (('datetime'),)

前文中merge_bar_from_tick中对于每个bar的字段定义也是来源于此,新的MyDataFeedWithBar不再需要重复定义这些管线。

另外,可以看到在处理时间字段时,对于tick的时间,我们使用了一个新的管线tickdt。这样做的原因是为了能够更加明确的和Kbar进行区分。而定义于OHLC类的datetime管线我们则直接绑定为分钟bar的时间。这样做的好处是指标的计算会更容易(参见Indicators - Usage - Backtrader

使用自定义数据源

首先需要将我们自定义的数据源设定为cerebro的数据,对于本文讨论的实现:

python">
df = pd.read_csv('./datas/DCE.m2501.tick.202402.csv')
df['tickdt'] = pd.to_datetime(df['tickdt'])

data_reader = dr.MyDataReader(df, 1)
tick_feed = bfeed.MyDataFeedWithBar(data_reader, 0)
cerebro.adddata(tick_feed)

cerebro.run()

在策略中,我们依然依照一般习惯去指定一些别名:

python">class TestStrategy(bt.Strategy):
    def __init__(self):
        # Ticks 字段
        self.tickdt = self.datas[0].tickdt
        self.price = self.datas[0].price
        self.vol = self.datas[0].close
        self.amount = self.datas[0].amount
        self.ccl = self.datas[0].ccl
        self.bid = self.datas[0].bid
        self.bidVol = self.datas[0].bidVol
        self.ask = self.datas[0].ask
        self.askVol = self.datas[0].askVol
        self.local_tz = get_localzone()
        # 1Min bar 字段: 使用框架标准字段
        self.datetime = self.datas[0].datetime
        self.close = self.datas[0].close
        self.low = self.datas[0].low
        self.high = self.datas[0].high
        self.open = self.datas[0].open
    

之后在运行策略时,next方法就可以很容易的同时使用tick和K线数据了:

python">def next(self):
    self.log('Tick price, %.2f' % self.price[0], self.tickdt)
    self.log(f'bar OHLC: ({self.open[0]}, {self.high[0]}, {self.low[0]}, {self.close[0]})', self.datetime)

总结

本篇总结了在BackTrader运行策略中同时使用tick和K线数据的方法。这里我们没有使用框架的Data Feeds - Resample - Backtrader方法。而是采用了将tick和最小的分钟周期绑定在同一个数据源的方式,这增加了灵活性,而且一点也不复杂,同时bar数据使用标准管线仍然支持所有框架指标的使用。

此外本文讨论了如何使用动态数据源的方式,并且处于演示目的给出了一个非常简化的示例。基于Tick的回测基本可以确定全是大数据回测,几G的数据丝毫不奇怪。所以动态读取数据几乎是必须得操作。

实际操作中,我们通常不仅仅需要看一个周期,还希望策略可以同时检测更大的周期。本系列下一篇文章将会讨论这个话题。


http://www.niftyadmin.cn/n/5534617.html

相关文章

详解 ClickHouse 的查询优化

一、单表查询 1. 使用 prewhere 替代 where prewhere 和 where 语句的作用相同,都是用来过滤数据prewhere 和 where 语句的不同在于: prewhere 只支持 MergeTree 族系列引擎的表prewhere 首先会读取指定的列数据来判断数据过滤,等待数据过滤…

GD32 串口接受异常的几个原因

前面我们介绍过GD32 485发送时出现异常的最常见原因,有小伙伴反馈想要知道GD32 串口接受异常的可能原因,今天我们就来安排。 一、波特率异常导致收发出错 我们知道,串口是异步通讯接口,通讯双方或者多方都需要工作在相同波特率下…

Kotlin基础——Typeclass

高阶类型 如在Iterable新增泛型方法时 interface Iterable<T> {fun filter(p: (T) -> Boolean): Iterable<T>fun remove(p: (T) -> Boolean): Iterable<T> filter { x -> !p(x) } }对应的List、Set实现上述方法时仍需要返回具体的类型 interfac…

Java中的反射编程实用指南

Java中的反射编程实用指南 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天&#xff0c;我们将深入探讨Java中的反射编程。反射是Java提供的一种强大机制&am…

DDei在线设计器-API-DDeiFile

DDeiFile DDeiFile是代表一个设计文件&#xff0c;一个文件含有多个DDeiSheet(页签)。   DDeiFile实例包含了一个文件的所有数据&#xff0c;在获取后可以通过它访问其他内容。DDeiEditor中的files属性记录了当前打开的文件列表。 一个DDeiEditor实例至少包含一个DDeiFile实例…

创客项目秀|基于XIAO ESP32S3 sense 的小型相机

在这个科技飞速发展的时代&#xff0c;DIY&#xff08;Do It Yourself&#xff09;文化正成为连接创新与日常生活的桥梁&#xff0c;今天小编给大家带来了来自麻省理工学院的Arnov Sharma 的基于XIAO ESP32S3 sense的小型相机项目&#xff0c;该相机拥有一个圆形的触摸屏幕可以…

uni-app与原生插件混合开发调试3-安卓原生插件开发调试和打包

安卓原生插件开发调试和打包 上面已经介绍了怎么安装开发和调试环境&#xff0c;接下来就是安卓原生插件的具体开发和调试步骤&#xff1a; 将uniapp前端项目的index.vue文件新增代码。代码如图所示&#xff1a; <template><view><view><text>{{titl…

【问题记录】Ubuntu提示: “E: 软件包 gcc 没有可安装候选“

Ubuntu提示: "E: 软件包 gcc 没有可安装候选" 一&#xff0c;问题现象二&#xff0c;问题原因&解决方法 一&#xff0c;问题现象 在虚拟机Ubuntu中进行安装gcc命令时报错&#xff1a;“E: 软件包 gcc 没有可安装候选”: 二&#xff0c;问题原因&解决方法 …