Thursday, January 04, 2018

some notes on luigi:

luigi does not come with feature to trigger jobs, even if you're using luigid

rundeck or jenkins could help.


I use rundeck and run task with local scheduler.

to retry task when using local scheduler:

you'll need to combine –scheduler-retry-count=2 –scheduler-retry-delay=10 –worker-keep-alive –local-scheduler

  • retry count could be a per task policy
  • default value for retry delay is 15 minutes, which is too long for local scheduler
  • for local scheduler, worker needs to be kept alive in order to retry task, without –worker-keep-alive, whole task ends once failed.


by default all exit codes are 0 unless you defined these in your luigi.cfg: (see here)

[retcode]
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40

when task failed, rundeck marks it as successful since it receives exit code 0

I'll just add –retcode-task-failed=1 to solve the problem.


to avoid adding too many arguments, can use a luigi.cfg or pass them to luigi.run()

LUIGI_CONFIG_PATH can be used for per task config file:

LUIGI_CONFIG_PATH=task1.cfg luigi --module=task1 Task1 --local-scheduler

or add this to the end of file:

if __name__ == '__main__':
    run_params = ['Task1', 
        '--scheduler-retry-count=2', 
        '--scheduler-retry-delay=10', 
        '--workder-keep-alive', 
        '--local-scheduler']
    
    luigi.run(run_params)

then you can execute the python script directly, which runs with the pre-defined options

or run with normal luigi –module=... for a different set of options

note, in order to use –retcode-xxx arguments, need to run with luigi.retcodes.run_with_retcodes(run_params)


for some table insert tasks, I'll implement a complete() method to check whether data was inserted before: (see here)

    def complete(self):
        ... select count(*) from table where last_insert_date='xxx' ...
        return bool

luigi.contrib has database Target, however, they just check whether table exists.


pandas, I have unicode error on dataframe.to_csv()

finally it was solved by adding ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 to Dockerfile

Saturday, January 06, 2018

to access database via a bastion machine, use ssh tunnel:

ssh -N -L 3366:database-host:3306 bastion-host

or add to .ssh/config:

Host bastion-host
  Hostname bastion-ip
  Localforward 3366 database-host:3306

then simply use ssh -N bastion-host to setup the tunnel

reference:


some clojure stuffs to try out:

$ lein jupyter install-kernel
$ lein jupyter notebook

Tuesday, January 09, 2018

Rich Hickey wrote about Git Deps for Clojure, I don't have time to read Deps and CLI Guide yet.

it's definitely the implementation of his Spec-ulation talk


biggest news recently, Meltdown and Spectre:


Papers I've Read in 2017

the first and writer's favorite paper Out of the Tar Pit / Ben Moseley, Peter Marks / 2006 (PDF), I actually was reading it as well. David Nolen mentioned it in his talk: A Practical Functional Relational Architecture

and reviewed by the morning paper: Out of the Tar Pit too


I also watched few interesting talks:

I always want to try creating a k8s cluster on lxd


need to test out an old code base, it will request external api for data

the first think I want to do is setup a proxy to cache external api responses, also able to monitor reqeusts

since those are POST requests, maybe a little bit difficult for varnish

nginx is more simple:

http {
    client_max_body_size 50k;

    proxy_cache_path /tmp/cache levels=1:2 keys_zone=apiCache:1m inactive=4h;
 
    server {
        listen 8080;
        server_name localhost;

        location / {
            try_files $uri @cache_backend;
        }
 
        location @cache_backend {
            proxy_pass http://external-api-host;
            proxy_cache apiCache;
            proxy_cache_methods POST;
            proxy_cache_key "$request_uri|$request_body";
            proxy_buffers 8 32k;
            proxy_buffer_size 64k;
            proxy_cache_valid 4h;
            proxy_cache_use_stale updating;
            add_header X-Cached $upstream_cache_status;
        }
    }
}

I found setup a proxy between services is quite useful, I may wrote a simple go program to do it.


Blog Archives

Search Blog: