dimanche 12 février 2017

Dask datetime Python3 dataframe read_csv

Vote count: 0

With a csv file laid out like this

dtime,Ask,Bid,AskVolume,BidVolume 2003-08-04 00:01:06.430000,1.93273,1.93233,2400000,5100000 2003-08-04 00:01:15.419000,1.93256,1.93211,21900000,4000000 2003-08-04 00:01:18.298000,1.93240,1.93220,18700001,7600000 2003-08-04 00:01:24.950000,1.93264,1.93234,800000,600000 2003-08-04 00:01:26.073000,1.93284,1.93244,2800000,800000 2003-08-04 00:01:29.340000,1.93286,1.93246,7100000,2400000 2003-08-04 00:01:50.452000,1.93278,1.93258,4000000,4800000 2003-08-04 00:01:56.979000,1.93294,1.93244,22600000,13500000 2003-08-04 00:02:20.078000,1.93248,1.93238,3200000,5600000

Using the following code:


    import sys
    import pandas as pd
    import numpy as np
    import json
    import psycopg2 as pg
    import pandas.io.sql as psql
    import dask
    import dask.dataframe as dd
    import datetime as dt
    from sklearn.cluster import MeanShift, estimate_bandwidth
    from sklearn.externals.joblib import parallel
def parse_dates(df):
  return pd.to_datetime(df['dtime'], format = '%Y-%m-%d %H:%M:%S.%f')

def main():
    meta = ('time', pd.Timestamp)
    dask.set_options(get=dask.multiprocessing.get)
    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Doing Start of Processing CSV")
    df = dd.read_csv('/zdb1/trading/tick_data/GBPJPY.csv', sep=',', names=['dtime', 'Ask', 'Bid', 'AskVolume', 'BidVolume'],)
    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...reading CSV and above datetime")
    df.map_partitions(parse_dates, meta=meta).compute()
    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...finished datetime index above grouped")
    grouped_data = df.dropna()
    ticks_data = grouped_data['Ask'].resample('24H').ohlc()

    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...grouped_data.resample")
    sell_data = grouped_data.as_matrix(columns=['Ask']).compute()

    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...grouped_data.as_matrix")
    bandwidth = estimate_bandwidth(sell_data, quantile=0.1, n_samples=100).compute()
    ms = MeanShift(bandwidth=bandwidth, bin_seeding=True, n_jobs=-1)

    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...MeanShift setup")
    ms.fit(sell_data).compute()
    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...MeanShift fit")

    ml_results = []
    for k in range(len(np.unique(ms.labels_))):
        my_members = ms.labels_ == k
        values = sell_data[my_members, 0]

        ml_results.append(min(values))
        ml_results.append(max(values))

    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...MeanShift for k")
    ticks_data.to_json('ticks.json', date_format='iso', orient='index')

    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...ticks_data.to_json")
    with open('ml_results.json', 'w') as f:
        f.write(json.dumps(ml_results))

    print (dt.datetime.now().strftime("%A, %d. %B %Y %I:%M:%S%p"),"Done...Closing all connections")

if __name__ == "__main__":
    main()

I get date error and I do not understand why. Maybe someone would be kind and point out the error and how to fix it where the code would run! Something about dask I am not understanding here.


    # clear ; python3.5 wtf.py
    Sunday, 12. February 2017 08:36:51PM Doing Start of Processing CSV
    Sunday, 12. February 2017 08:36:53PM Done...reading CSV and above datetime
    /usr/local/lib/python3.5/site-packages/dask/async.py:245: DtypeWarning: Columns (1,2,3,4) have mixed types. Specify dtype option on import or set low_memory=False.
      return [_execute_task(a, cache) for a in arg]
    Traceback (most recent call last):
      File "wtf.py", line 56, in 
        main()
      File "wtf.py", line 22, in main
        df.map_partitions(parse_dates, meta=meta).compute()
      File "/usr/local/lib/python3.5/site-packages/dask/base.py", line 79, in compute
        return compute(self, **kwargs)[0]
      File "/usr/local/lib/python3.5/site-packages/dask/base.py", line 179, in compute
        results = get(dsk, keys, **kwargs)
      File "/usr/local/lib/python3.5/site-packages/dask/multiprocessing.py", line 86, in get
        dumps=dumps, loads=loads, **kwargs)
      File "/usr/local/lib/python3.5/site-packages/dask/async.py", line 493, in get_async
        raise(remote_exception(res, tb))
    dask.async.ValueError: time data 'dtime' doesn't match format specified
Traceback
---------
  File "/usr/local/lib/python3.5/site-packages/dask/async.py", line 268, in execute_task
    result = _execute_task(task, data)
  File "/usr/local/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "/usr/local/lib/python3.5/site-packages/dask/dataframe/core.py", line 3013, in apply_and_enforce
    df = func(*args, **kwargs)
  File "wtf.py", line 14, in parse_dates
    return pd.to_datetime(df['dtime'], format = '%Y-%m-%d %H:%M:%S.%f')
  File "/usr/local/lib/python3.5/site-packages/pandas/util/decorators.py", line 91, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.5/site-packages/pandas/tseries/tools.py", line 421, in to_datetime
    values = _convert_listlike(arg._values, False, format)
  File "/usr/local/lib/python3.5/site-packages/pandas/tseries/tools.py", line 413, in _convert_listlike
    raise e
  File "/usr/local/lib/python3.5/site-packages/pandas/tseries/tools.py", line 401, in _convert_listlike
    require_iso8601=require_iso8601
  File "pandas/tslib.pyx", line 2374, in pandas.tslib.array_to_datetime (pandas/tslib.c:44175)
  File "pandas/tslib.pyx", line 2503, in pandas.tslib.array_to_datetime (pandas/tslib.c:42192)

Any ideas on what is wrong here? Works fine if pandas but I cannot make it work with dask. I cannot figure out how to set the timestamp in dask for primary index!

asked 28 secs ago

Let's block ads! (Why?)



Dask datetime Python3 dataframe read_csv

Aucun commentaire:

Enregistrer un commentaire