One Hat Cyber Team
Your IP :
216.73.216.115
Server IP :
194.44.31.54
Server :
Linux zen.imath.kiev.ua 4.18.0-553.77.1.el8_10.x86_64 #1 SMP Fri Oct 3 14:30:23 UTC 2025 x86_64
Server Software :
Apache/2.4.37 (Rocky Linux) OpenSSL/1.1.1k
PHP Version :
5.6.40
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
backup
/
oldserver
/
2
/
sbin
/
View File Name :
yum-updatesd
#!/usr/bin/python -tt # # Proof of concept yumd implementation # # This program is free software; you can redistribute it and/or modify # it under the terms of version 2 of the GNU General Public License # as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2007 Red Hat, Inc. # Jeremy Katz <katzj@redhat.com> # # since it takes me time everytime to figure this out again, here's how to # queue a check with dbus-send. adjust appropriately for other methods # $ dbus-send --system --print-reply --type=method_call \ # --dest=edu.duke.linux.yum /Updatesd edu.duke.linux.yum.CheckNow import os, sys import string import syslog import string import subprocess from optparse import OptionParser import dbus import dbus.service import dbus.glib import gobject import gamin from yum.config import BaseConfig, Option, IntOption, ListOption, BoolOption from yum.parser import ConfigPreProcessor from ConfigParser import ConfigParser, ParsingError updateInfoDone = False updateInfo = [] helperProcess = None NM_ONLINE = 70 class UDConfig(BaseConfig): """Config format for the daemon""" run_interval = IntOption(3600) nonroot_workdir = Option("/var/tmp/yum-updatesd") emit_via = ListOption(['dbus', 'email', 'syslog']) email_to = ListOption(["root"]) email_from = Option("root") smtp_server = Option("localhost:25") use_sendmail = BoolOption(True) dbus_listener = BoolOption(True) do_update = BoolOption(False) do_download = BoolOption(False) do_download_deps = BoolOption(False) updaterefresh = IntOption(3600) syslog_facility = Option("DAEMON") syslog_level = Option("WARN") syslog_ident = Option("yum-updatesd") yum_config = Option("/etc/yum/yum.conf") # added debugging debug = BoolOption(False) def read_config(): confparser = ConfigParser() opts = UDConfig() config_file = '/etc/yum/yum-updatesd.conf' if os.path.exists(config_file): confpp_obj = ConfigPreProcessor(config_file) try: confparser.readfp(confpp_obj) except ParsingError, e: print >> sys.stderr, "Error reading config file: %s" % e sys.exit(1) opts.populate(confparser, 'main') return opts class YumDbusListener(dbus.service.Object): def __init__(self, bus_name, object_path='/Updatesd', allowshutdown = False, config = None): dbus.service.Object.__init__(self, bus_name, object_path) self.allowshutdown = allowshutdown self.yumdconfig = config def doCheck(self): checkUpdates(self.yumdconfig) return False @dbus.service.method("edu.duke.linux.yum", in_signature="") def CheckNow(self): # make updating checking asynchronous since we discover whether # or not there are updates via a callback signal anyway gobject.idle_add(self.doCheck) return "check queued" @dbus.service.method("edu.duke.linux.yum", in_signature="") def ShutDown(self): if not self.allowshutdown: return False # we have to do this in a callback so that it doesn't get # sent back to the caller gobject.idle_add(sys.exit, 0) return True @dbus.service.method("edu.duke.linux.yum", in_signature="", out_signature="a(a{ss}a{ss})") def GetUpdateInfo(self): # FIXME: this call is deprecated. things should watch for the update # info signals global updateInfoDone, updateInfo if not updateInfoDone: # FIXME: this isn't synchronous anymore. but it should be # reasonable enough given the users gobject.idle_add(self.doCheck) return [] return updateInfo def checkHelperStatus(): global helperProcess if helperProcess.poll() is not None: helperProcess = None return False return True def checkUpdates(opts, wait = False): global helperProcess if helperProcess is not None: print >> sys.stderr, "Helper process already running" return True if os.path.exists("./yum-updatesd-helper") and opts.debug: args = ["./yum-updatesd-helper", "--check"] else: args = ["/usr/libexec/yum-updatesd-helper", "--check"] bus = dbus.SystemBus() try: o = bus.get_object("org.freedesktop.NetworkManager", "/org/freedesktop/NetworkManager") if o.state() != NM_ONLINE: args.append("--network-fail") except dbus.DBusException: pass # arguments for what to do if opts.do_download: args.append("--download") if opts.do_download_deps: args.append("--deps") if opts.do_update: args.append("--apply") # how should we send notifications? if "dbus" in opts.emit_via: args.append("--dbus") if "email" in opts.emit_via: args.extend(["--email", "--email-from=%s" %(opts.email_from,), "--email-to=%s" %(string.join(opts.email_to, ","),), "--smtp-server=%s" %(opts.smtp_server)]) if opts.use_sendmail: args.append("--sendmail") if "syslog" in opts.emit_via: args.extend(["--syslog", "--syslog-level=%s" %(opts.syslog_level,), "--syslog-facility=%s" %(opts.syslog_facility,), "--syslog-ident=%s" %(opts.syslog_ident,)]) if opts.debug: args.append("--debug") print >> sys.stderr, "Going to exec: %s" %(args,) helperProcess = subprocess.Popen(args, close_fds = True) if not wait: gobject.timeout_add(1 * 1000, checkHelperStatus) return True return helperProcess.wait() def add_update(update): global updateInfo, updateInfoDone if updateInfoDone: updateInfo = [] updateInfoDone = False updateInfo.append(update) def updates_done(num): global updateInfoDone, updateInfo # if we received all the updates, we're good. otherwise, we need to # clear the info out if int(num) != len(updateInfo): updateInfo = [] else: updateInfoDone = True def updates_applied(*args): global updateInfoDone, updateInfo updateInfo = [] updateInfoDone = False def invalidate_cache(*args): global updateInfoDone, updateInfo, helperProcess if helperProcess is not None: return updateInfo = [] updateInfoDone = False def setup_watcher(): """Sets up gamin-based file watches on things we care about and makes it so they get checked every 15 seconds.""" # add some watches on directories. mon = gamin.WatchMonitor() mon.watch_directory("/var/lib/rpm", invalidate_cache) mon.watch_directory("/var/cache/yum", invalidate_cache) map(lambda x: os.path.isdir("/var/cache/yum/%s" %(x,)) and mon.watch_directory("/var/cache/yum/%s" %(x,), invalidate_cache), os.listdir("/var/cache/yum")) mon.handle_events() fd = mon.get_fd() gobject.io_add_watch(fd, gobject.IO_IN|gobject.IO_PRI, lambda x, y: mon.handle_events()) def network_state_change(newstate, opts): if int(newstate) == NM_ONLINE: checkUpdates(opts) def main(options = None): if options is None: parser = OptionParser() parser.add_option("-d", "--debug", action="store_true", default=False, dest="debug") parser.add_option("-f", "--no-fork", action="store_true", default=False, dest="nofork") parser.add_option("-o", "--oneshot", action="store_true", default=False, dest="oneshot") parser.add_option("-r", "--remote-shutdown", action="store_true", default=False, dest="remoteshutdown") (options, args) = parser.parse_args() if not options.oneshot and not options.nofork: if os.fork(): sys.exit() os.chdir("/") fd = os.open("/dev/null", os.O_RDWR) os.dup2(fd, 0) os.dup2(fd, 1) os.dup2(fd, 2) os.close(fd) syslog.openlog("yum-updatesd", 0, syslog.LOG_DAEMON) opts = read_config() if options.debug: opts.debug = True if options.oneshot: checkUpdates(opts, wait = True) sys.exit(0) try: bus = dbus.SystemBus() except dbus.DBusException, e: bus = None if opts.dbus_listener and bus is not None: name = dbus.service.BusName("edu.duke.linux.yum", bus=bus) YumDbusListener(name, allowshutdown = options.remoteshutdown, config = opts) bus.add_signal_receiver(add_update, "UpdateInfoSignal", dbus_interface="edu.duke.linux.yum") bus.add_signal_receiver(updates_done, "UpdatesAvailableSignal", dbus_interface="edu.duke.linux.yum") bus.add_signal_receiver(updates_applied, "UpdatesAppliedSignal", dbus_interface="edu.duke.linux.yum") try: if bus is not None: bus.add_signal_receiver(lambda x: network_state_change(x, opts), "StateChange", "org.freedesktop.NetworkManager") except Exception, e: pass run_interval_ms = opts.run_interval * 1000 # needs to be in ms gobject.timeout_add(run_interval_ms, checkUpdates, opts) # set up file watcher when we're idle gobject.idle_add(setup_watcher) mainloop = gobject.MainLoop() mainloop.run() if __name__ == "__main__": main()